From aab101bd1c570b70d5660ca80b72607754d61afe Mon Sep 17 00:00:00 2001 From: "Sam G." Date: Tue, 14 May 2024 06:37:08 -0700 Subject: [PATCH] add callback timeouts, secondary thread pools, and frame directives to Router --- .gitignore | 2 +- execlog/router.py | 603 +++++++++++++++++++++++++++++++------- execlog/syncers/router.py | 40 ++- execlog/util/generic.py | 7 + tests/test_router.py | 5 +- tests/test_server.py | 7 +- 6 files changed, 552 insertions(+), 112 deletions(-) diff --git a/.gitignore b/.gitignore index d46023a..78c4f94 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # generic py __pycache__/ .pytest_cache/ -localsys.egg-info/ +*.egg-info/ .ipynb_checkpoints/ .python-version diff --git a/execlog/router.py b/execlog/router.py index 7e5fdfb..f58be06 100644 --- a/execlog/router.py +++ b/execlog/router.py @@ -5,8 +5,10 @@ import time import asyncio import logging import inspect -import traceback +import traceback import threading +import concurrent +from enum import Enum from pathlib import Path from typing import Any, Callable from collections import defaultdict @@ -18,11 +20,24 @@ from tqdm.auto import tqdm from execlog.event import Event from execlog.listener import Listener -from execlog.util.generic import color_text +from execlog.util.generic import color_text, get_func_name logger = logging.getLogger(__name__) +class FrameDirective(Enum): + ''' + Indicates frame-level behavior when a callback fails. + ''' + CONTINUE_WITHOUT = 1 + CANCEL_FRAME = 2 + +class CallbackTimeoutError(Exception): + ... + +class CancelledFrameError(Exception): + ... + class Router[E: Event]: ''' Route events to registered callbacks @@ -78,6 +93,89 @@ class Router[E: Event]: completely for an event to be fully handled, later jobs may complete before earlier ones, or interact with intermediate disk states (raw file writes, DB inserts, etc), before the earliest call has had a chance to clean up. + + .. admonition:: Details behind the threaded model and future management + + Routers kick off execution in response to *events*. These events are received via + ``.submit``, and the following process is kick-started: + + 1. Each event is wrapped in its own ``.submit_event`` call and submitted as a task + to the *primary* thread pool. Let $E$ be the set of events, and $|E|$ be the + number of events. ``.submit`` exits as soon as these $|E|$ tasks are enqueued, + not waiting for the completion of the corresponding worker threads. There are + now $|E|$ tier-I tasks waiting to be started by the router's primary thread + pool, with states set to "pending." + 2. The primary thread pool begins running the enqueued ``.submit_event`` calls + concurrently using allocated resources (e.g., four threads). Each + ``.submit_event`` call matches the associated event $e$ to $c_e$ callbacks, + according to the registered routes. These callbacks are each individually + submitted to the *secondary* thread pool as tier-II tasks and waited upon + *within* the ``.submit_event`` call. This thread pool separation prevents + deadlocks which would otherwise be an issue if submitting both tier-I and + tier-II tasks to the same thread pool. Tier-I tasks that have begun this + process are in the "running" state, and the submitted tier-II futures are now + "pending." + 3. Once the $c_e$ callbacks for event $e$ are completed (which are tier-II tasks + being waited upon within a tier-I task), their done-callbacks will be invoked + within "a thread belonging to the process that added them" (with + ``wait_on_event_callbacks`` calling through to ``submit_callback``, which + attaches ``general_task_done``). Where these done callbacks are executed + varies based on a few conditions. See + + https://stackoverflow.com/a/26021772/4573915 + + for a great breakdown on this. The gist is the following: + + a. If the callback is attached to a future that is already cancelled or + completed, it will be invoked immediately in the current thread doing the + attaching. + b. If the future is queued/pending and successfully cancelled, the thread + doing the cancelling will immediately invoke all of the future's callbacks. + c. Otherwise, the thread that executes the future's task (could produce either + a successful result or an exception) will invoke the callbacks. + + So if a task completes (i.e., is not cancelled, producing either a result or an + exception), the thread that ran the task will also handle the associated + callbacks. If the task is successfully cancelled (which means it was never + running and never allocated a thread), the cancelling context will handle the + callbacks, and this happens here only in ``.shutdown()`` with the call + ``thread_pool.shutdown(cancel_futures=True)``. + + The results of these futures are made available in this ``submit_event`` + context. Note that these results are dictated by the logic in + ``wait_on_futures``. If a future was successfully cancelled or raised and + exception during execution, it will not have a result to add to this list. + + The *router-level* post-callbacks are then submitted to the secondary thread + pool and awaited in a similar fashion to the individual $c_e$ callbacks. The + results obtained from the event callbacks are passed through to these + "post-callbacks" for possible centralized processing. + 4. Once all post-callbacks have completed (along with *their* attached + "done-callbacks," which are just ``.general_task_done`` checks handled in the + executor thread that ran the post-callback), finally the tier-I + ``.submit_event`` future can be marked completed (either with a successfully + attached result or an exception), and its attached "done-callbacks" will be + ran the in the same tier-I thread that handled the task (which is + ``general_task_done`` and ``clear_event``, in that order). + + - General behaviors and additional remarks: + * Thread pool futures can only be cancelled prior to "running" or "done" states. + Both successful cancellation or completion trigger a future's done-callbacks, + which will be executed in one a few possible contexts depending several + conditions, as detailed above. + * Tier-I tasks have ``clear_event`` attached as a done-callback, which tracks + the task result and resubmits the event if valid (successfully debounced) + repeat requests were received while the event has been handled. + * Both tier-I and tier-II callbacks have a ``.general_task_done`` callback, which + attempts to retrieve the future result if it wasn't cancelled (if it was, this + retrieval would raise a ``CancelledError``). If it wasn't cancelled but an + exception was raised during execution, this same exception will be re-raised + and re-caught, logged as an error, and exit "cleanly" (since job failures + shouldn't throw off the entire process). A successful result retrieval will + have no effect. + + - On interrupts, exception handling, and future cancellation: + * ''' listener_cls = Listener[E] @@ -107,6 +205,7 @@ class Router[E: Event]: # store prepped (e.g., delayed) callbacks self.callback_registry = {} + self.callback_start_times = {} # track event history self.event_log = [] @@ -115,14 +214,23 @@ class Router[E: Event]: self.should_exit = False self._active_futures = set() - self._thread_pool = None + self._thread_pool_1 = None + self._thread_pool_2 = None self._route_lock = threading.Lock() @property - def thread_pool(self): - if self._thread_pool is None: - self._thread_pool = ThreadPoolExecutor(max_workers=self.workers) - return self._thread_pool + def primary_thread_pool(self): + '''Handle tier-I futures.''' + if self._thread_pool_1 is None: + self._thread_pool_1 = ThreadPoolExecutor(max_workers=self.workers) + return self._thread_pool_1 + + @property + def secondary_thread_pool(self): + '''Handle tier-II futures.''' + if self._thread_pool_2 is None: + self._thread_pool_2 = ThreadPoolExecutor(max_workers=self.workers) + return self._thread_pool_2 def register( self, @@ -131,6 +239,8 @@ class Router[E: Event]: pattern, debounce=200, delay=10, + callback_timeout=None, + condition=FrameDirective.CONTINUE_WITHOUT, **listener_kwargs, ): ''' @@ -157,8 +267,17 @@ class Router[E: Event]: ``filter(...)``) debounce: delay: + callback_timeout: timeout for waiting ''' - route_tuple = (callback, pattern, debounce, delay, listener_kwargs) + route_tuple = ( + callback, + pattern, + debounce, + delay, + callback_timeout, + condition, + listener_kwargs + ) self.routemap[endpoint].append(route_tuple) def submit(self, events: E | list[E], callbacks: list[Callable] | None = None): @@ -178,7 +297,13 @@ class Router[E: Event]: return futures - def submit_event(self, event: E, callbacks: list[Callable] | None = None): + def submit_event( + self, + event : E, + callbacks : list[Callable] | None = None, + timeouts : list[int|float] | None = None, + conditions : list[FrameDirective] | None = None, + ) -> list: ''' Group up and submit all matching callbacks for ``event``. All callbacks are ran concurrently in their own threads, and this method blocks until all are completed. @@ -198,7 +323,7 @@ class Router[E: Event]: # this may be split across threads mid-check, preventing one thread from # handling the blocking of the entire group with self._route_lock: - callbacks = self.matching_routes(event) + callbacks, timeouts, conditions = self.matching_routes(event) # stop early if no work to do if len(callbacks) == 0: @@ -210,34 +335,32 @@ class Router[E: Event]: self.queue_callbacks(event_idx, callbacks) return [] + # TODO: Chesterton's fence # callbacks now computed, flush the running event # note: a separate thread could queue valid callbacks since the running check; # o/w we know the running index is empty - self.running_events[event_idx] = self.running_events[event_idx] + # self.running_events[event_idx] = self.running_events[event_idx] # submit matching callbacks and wait for them to complete - future_results = self.wait_on_callbacks(callbacks, event) + # *may* raise a FrameCancelledError, which we let propagate upward + completed_futures = self.wait_on_event_callbacks( + event, + callbacks, + timeouts=timeouts, + conditions=conditions, + ) # finally call post event-group callbacks (only if some event callbacks were # submitted), wait for them to complete - if future_results: - self.wait_on_futures([ - self.submit_callback(post_callback, event, future_results) + if completed_futures: + wait([ + self.submit_event_callback(post_callback, event, completed_futures)[0] for post_callback in self.post_callbacks ]) - return future_results - - def submit_callback(self, callback: Callable, *args, **kwargs): - ''' - Note: this method is expected to return a future. Perform any event-based - filtering before submitting a callback with this method. - ''' - # exit immediately if exit flag is set - # if self.should_exit: - # return - callback = self.wrap_safe_callback(callback) + return completed_futures + def _submit_with_thread_pool(self, thread_pool, callback: Callable, *args, **kwargs): if inspect.iscoroutinefunction(callback): if self.loop is None: self.loop = asyncio.new_event_loop() @@ -251,15 +374,48 @@ class Router[E: Event]: self.loop, ) else: - future = self.thread_pool.submit( + future = thread_pool.submit( callback, *args, **kwargs ) - self._active_futures.add(future) - future.add_done_callback(self.general_task_done) return future - def matching_routes(self, event: E, event_time=None): + def submit_callback(self, callback: Callable, *args, **kwargs): + future = self._submit_with_thread_pool( + self.primary_thread_pool, + callback, + *args, **kwargs + ) + self._active_futures.add(future) + + return future + + def submit_event_callback(self, callback: Callable, event: E, *args, **kwargs): + ''' + Note: this method is expected to return a future. Perform any event-based + filtering before submitting a callback with this method. + ''' + # exit immediately if exit flag is set + # if self.should_exit: + # return + submitted_time = time.time() + callback = self.wrap_timed_callback(callback, submitted_time) + + future = self._submit_with_thread_pool( + self.secondary_thread_pool, + callback, + event, + *args, **kwargs + ) + future.add_done_callback(self.general_callback) + + return future, submitted_time + + def matching_routes( + self, + event: E, + event_time = None + ) -> tuple[list[Callable], list[int|float]]: ''' Return eligible matching routes for the provided event. @@ -292,12 +448,14 @@ class Router[E: Event]: debounce on the first matching action. ''' matches = [] + timeouts = [] + conditions = [] endpoint = event.endpoint name = event.name #action = tuple(event.action) # should be more general event_time = time.time()*1000 if event_time is None else event_time - for (callback, pattern, debounce, delay, listen_kwargs) in self.routemap[endpoint]: + for (callback, pattern, debounce, delay, ctimeout, condition, listen_kwargs) in self.routemap[endpoint]: #index = (endpoint, name, action, callback, pattern, debounce, delay) index = (endpoint, name, callback, pattern, debounce, delay) @@ -305,9 +463,7 @@ class Router[E: Event]: # reject event continue - callback_name = str(callback) - if hasattr(callback, '__name__'): - callback_name = callback.__name__ + callback_name = get_func_name(callback) name_text = color_text(name, Fore.BLUE) pattern_text = color_text(pattern, Fore.BLUE) @@ -317,6 +473,8 @@ class Router[E: Event]: if self.filter(event, pattern, **listen_kwargs): # note that delayed callbacks are added matches.append(self.get_delayed_callback(callback, delay, index)) + timeouts.append(ctimeout) + conditions.append(condition) # set next debounce self.next_allowed_time[index] = event_time + debounce @@ -331,7 +489,7 @@ class Router[E: Event]: f'Event [{name_text}] {match_text} against [{pattern_text}] under [{endpoint_text}] for [{callback_text}]' ) - return matches + return matches, timeouts, conditions def get_delayed_callback(self, callback: Callable, delay: int|float, index): ''' @@ -356,29 +514,195 @@ class Router[E: Event]: return self.callback_registry[index] - def wait_on_futures(self, futures): + def wait_on_event_callbacks( + self, + event : E, + callbacks : list[Callable], + timeouts : list[int | float | None] | None = None, + conditions : list[FrameDirective | None] | None = None, + ): #, *args, **kwargs): ''' - Block until all futures in ``futures`` are complete. Return collected results as a - list, and log warnings when a future fails. - ''' - future_results = [] - for future in as_completed(futures): - try: - if not future.cancelled(): - future_results.append(future.result()) - except Exception as e: - logger.warning(f"Router callback job failed with exception \"{e}\"") + Waits for event-associated callbacks to complete. - return future_results + Submits each callback in ``callbacks`` to the thread pool (via + ``submit_callback``), passing ``event`` as the only parameter to each. Once + started, the future for callback ``callbacks[i]`` will have ``timeouts[i]`` + seconds to complete. If it has not completed in this time, the future's result is + set to the Timeout exception - def wait_on_callbacks(self, callbacks: list[Callable], event: E, *args, **kwargs): - ''' Overridable by inheriting classes based on callback structure ''' - return self.wait_on_futures([ - self.submit_callback(callback, event, *args, **kwargs) - for callback in callbacks - ]) + if timeouts is None: + timeouts = [None]*len(callbacks) + + if conditions is None: + conditions = [FrameDirective.CONTINUE_WITHOUT]*len(callbacks) + + future_map = {} + timed_futures = set() + for callback, timeout, on_err in zip(callbacks, timeouts, conditions): + # "raw" callback here is the reference that will be indexed in `.callback_start_times` + future, submitted_time = self.submit_event_callback(callback, event) #*args, **kwargs) + future_map[future] = ( + callback, + get_func_name(callback), + future, + submitted_time, + timeout, + on_err + ) + + if timeout is not None: + timed_futures.add(future) + + completed_futures = [] + # iterate while there are some futures not completed (cancelled, or finished w/ + # exception or return value). When completed, a future is removed from `futures`. + while future_map: + min_timeout = float('inf') + expired_futures = [] # actively running but expired + + if timed_futures: + time_now = time.time() + for future in list(timed_futures): + callback, cback_name, future, submitted_time, timeout, on_err = future_map[future] + future_tuple = (future, cback_name, on_err) + callback_id = (callback, submitted_time) + + if future.done(): + timed_futures.remove(future) + continue + + if future.running() and callback_id in self.callback_start_times: + time_running = time_now - self.callback_start_times[callback_id] + time_left = timeout - time_running + + # track running futures that have timed out + if time_left < 0: + expired_futures.append(future_tuple) + continue + else: + # running w/o a start time, or queued. Set time-left to the timeout + time_left = timeout + + # track running future w/ smallest non-zero time left before timeout + min_timeout = min(min_timeout, time_left) + + done_futures = [] + for future in list(future_map.keys()): + _, cback_name, future, _, _, on_err = future_map[future] + future_tuple = (future, cback_name, on_err) + + if future.done(): + # done futures have a set result or exception and now immutable + done_futures.append(future_tuple) # local + completed_futures.append(future) # global + future_map.pop(future) + + done_with_exception = [ + ftuple + for ftuple in done_futures + if ftuple[0].exception() is not None + ] + frame_cancellable = expired_futures + done_with_exception + + cancel_frame = False + # set a timeout exception for all expired futures, and determine + # if any of these timeouts should result in a frame cancellation + for _, _, on_err in frame_cancellable: + if on_err == FrameDirective.CANCEL_FRAME: + cancel_frame = True + break + + # set exceptions on running expired futures + for expired_future, cback_name, _ in expired_futures: + # results are immutable once set; double check doneness + if not expired_future.done(): + expired_future.set_exception( + CallbackTimeoutError( + f'Event callback {cback_name} timed out with condition {on_err}' + ) + ) + + if cancel_frame: + # cancel the active frame. Expired futures have already been handled, and + # we now need to handle running futures (not cancellable) and pending + # futures (cancellable) + + # explicitly cancel queued futures before looping through active futures + for future in list(future_map.keys()): + # cancellable futures not yet started + # can't really be more specific here, sets a CancelledError + future.cancel() + + for future in list(future_map.keys()): + _, cback_name, future, _, _, _ = future_map[future] + + if future.done(): + # includes the expired futures whose exceptions were just set & + # those cancelled, although managing `future_map` isn't needed at + # this stage + future_map.pop(future) + elif future.running(): + # possible memory leak + future.set_exception( + CancelledFrameError( + f'Indirect frame cancellation for event callback {cback_name}' + ) + ) + # attempt to communicate with threaded processes for graceful shutdown + # -> set shutdown flags + ... + + # finally, raise an exception indicating the frame was cancelled + raise CancelledFrameError + + # if no expired futures (or they can be ignored with CONTINUE_WITHOUT), carry + # on. Wait for the future with the least time remaining; we shouldn't need to + # do any of the above future parsing until at least min-timeout more seconds. + timeout = None + if min_timeout < float('inf'): + timeout = min_timeout + + # if any remaining futures can produce CANCELLED FRAMEs, wait only on them. + # This lets us be maximally dynamic: + # -> We respond before the timeout only if the newly completed might CANCEL + # the FRAME, which we want to do as soon as possible. + # -> If not responding early, we wait at most `timeout` seconds to be back and + # checking if the *earliest possible timeout violation* has occurred. If + # so, we want to handle it immediately no matter what so I can set the + # desired Timeout exception as *early as possible*. If I don't, the future + # might finish and set its own result, shutting me out when I would've + # liked to capture the timeout violation. + # -> When CANCEL FRAME futures are available, I don't wait on all pending + # futures as I *don't mind* being late to them. I only need to check + # immediately on CANCEL FRAMEs, and otherwise be back no later than + # `timeout`. If there's no `timeout` I'll wait for the first CANCEL FRAME, + # if no CANCEL FRAME I'll wait up to timeout on all futures, and if neither + # I'll simply wait until all futures complete. + cancel_frame_futures = [ + future + for future, ftuple in future_map.items() + if ftuple[-1] == FrameDirective.CANCEL_FRAME + ] + + return_when = concurrent.futures.ALL_COMPLETED + wait_futures = future_map.keys() + if cancel_frame_futures: + return_when = concurrent.futures.FIRST_COMPLETED + wait_futures = cancel_frame_futures + + # if no active future has a timeout + done, _ = wait( + wait_futures, + timeout=timeout, + return_when=return_when + ) + + # add any trailing completed futures + completed_futures.extend(done) + + return completed_futures def queue_callbacks(self, event_idx, callbacks: list[Callable]): ''' @@ -386,7 +710,7 @@ class Router[E: Event]: ''' self.running_events[event_idx].update(callbacks) - def wrap_safe_callback(self, callback: Callable): + def wrap_timed_callback(self, callback: Callable, submitted_time): ''' Check for shutdown flag and exit before running the callbacks. @@ -394,9 +718,8 @@ class Router[E: Event]: an interrupt is received. ''' def safe_callback(callback, *args, **kwargs): - if self.should_exit: - logger.debug('Exiting early from queued callback') - return + # track when this task actually begins + self.callback_start_times[(callback, submitted_time)] = time.time() return callback(*args, **kwargs) @@ -448,31 +771,67 @@ class Router[E: Event]: def clear_event(self, event: E, future): ''' - Clear an event. Pops the passed event out of ``running_events``, and the request - counter is >0, the event is re-submitted. + Clear an event. Pops the passed event out of ``running_events``, and if the + request counter is >0, the event is re-submitted. This method is attached as a "done" callback to the main event wrapping job ``submit_event``. The ``future`` given to this method is one to which it was attached as this "done" callback. This method should only be called when that ``future`` is finished running (or failed). If any jobs were submitted in the - wrapper task, the future results here should be non-empty. We use this fact to - filter out non-work threads that call this method. Because even the - ``matching_routes`` check is threaded, we can't wait to see an event has no work to - schedule, and thus can't prevent this method being attached as a "done" callback. - The check for results from the passed future allows us to know when in fact a - valid frame has finished, and a resubmission may be on the table. + wrapper task, the future results here should be non-empty (even if the methods + don't return anything; we'll at least have ``[None,...]`` if we scheduled at least + one callback). We use this fact to filter out non-work threads that call this + method. Because even the ``matching_routes`` check is threaded, we can't wait to + see an event has no work to schedule, and thus can't prevent this method being + attached as a "done" callback. The check for results from the passed future allows + us to know when in fact a valid frame has finished, and a resubmission may be on + the table. + + .. admonition:: Why we don't need to worry about resubmission w/ empty results + + Note that, even though we can't check for whether there will be any matching + routes prior to calling ``submit_event`` (we have to wait until we're in that + method, at which point this method will already be attached as a callback), + the event will never be marked as "running" and added to ``running_events``. + This means that we won't queue up any callbacks for the same event while + waiting on it, and then exit early here, never to re-submit. The worry was + that a event response might match 0 callbacks (e.g., due to debouncing) and + return ``[]`` from ``submit_event``, but *while waiting for this to complete*, + the same event is submitted and matches (e.g., debouncing timers now allowing + the event through). This could mean that the repeat event gets queued behind + the event in ``running_events`` and should be resubmitted here as a "queued + callback," but *won't do so* because we exit early if no results are obtained. + This is **not an issue** because the original event (that didn't match any + callbacks) will never be marked as running, and thus never prevent the second + event from itself running if in fact in matches a non-empty callback set. This + means that an empty future result set seen here indicates both 1) that no work + took place, and 2) no conflicting events were prevented from running, and we + can exit early here. ''' - result = None - if not future.cancelled(): - result = future.result() - else: - return None + self._active_futures.remove(future) - # result should be *something* if work was scheduled - if not result: - return None + # result should be *something* if work was scheduled, since `submit_event` wraps + # up futures in a list. If no result, event was never marked active, and don't + # want to resubmit as any duplicates were allowed to start. Attempt to get result, + # returning if it's (None or []) or raised an Exception (possibly a + # FrameCancelledError if there's a frame issue, for a CancelledError if the tier-I + # task was successfully cancelled following a `.shutdown()` call) + try: + if not future.result(): return + except concurrent.futures.CancelledError as e: + #logger.error(f'Tier-I future cancelled') + # do not re-raise; outer context can handle cancellations + return + except CancelledFrameError as e: + # do not re-raise; outer context will manage frame cancellations + return + except Exception as e: + # print traceback for unexpected exception + logger.error(f'Unexpected exception in tier-I future: "{e}"') + traceback.print_exc() + return - self.event_log.append((event, result)) + self.event_log.append((event, future)) queued_callbacks = self.stop_event(event) # resubmit event if some queued work remains @@ -485,28 +844,38 @@ class Router[E: Event]: def event_index(self, event): return event[:2] + def general_callback(self, future): + try: + future.result() + except concurrent.futures.CancelledError as e: + logger.error(f'Tier-II future cancelled; "{e}"') + except CancelledFrameError as e: + logger.error(f'Tier-II frame cancelled; "{e}"') + except Exception as e: + logger.warning(f'Tier-II job failed with unknown exception "{e}"') + def shutdown(self): logger.info(color_text('Router shutdown received', Fore.BLACK, Back.RED)) - self.should_exit = True + + # manually track and cancel pending futures b/c `.shutdown(cancel_futures=True)` + # is misleading, and will cause an outer `as_completed` loop to hang for future in tqdm( list(self._active_futures), - desc=color_text('Cancelling active futures...', Fore.BLACK, Back.RED), + desc=color_text( + f'Cancelling {len(self._active_futures)} pending futures...', + Fore.BLACK, Back.RED), colour='red', ): future.cancel() - if self.thread_pool is not None: - self.thread_pool.shutdown(wait=False) + if self._thread_pool_2 is not None: + # cancel pending futures (i.e., those not started) + self.secondary_thread_pool.shutdown(wait=False) - def general_task_done(self, future): - self._active_futures.remove(future) - try: - if not future.cancelled(): - future.result() - except Exception as e: - logger.error(f"Exception occurred in threaded task: '{e}'") - #traceback.print_exc() + if self._thread_pool_1 is not None: + # cancel pending futures (i.e., those not started) + self.primary_thread_pool.shutdown(wait=False) class ChainRouter[E: Event](Router[E]): @@ -538,7 +907,11 @@ class ChainRouter[E: Event](Router[E]): for endpoint, routelist in router.routemap.items(): self.routemap[endpoint].extend(routelist) - def matching_routes(self, event: E, event_time=None): + def matching_routes( + self, + event: E, + event_time=None + ): ''' Colloquial ``callbacks`` now used as a dict of lists of callbacks, indexed by router, and only having keys for routers with non-empty callback lists. @@ -546,25 +919,51 @@ class ChainRouter[E: Event](Router[E]): if event_time is None: event_time = time.time()*1000 - route_map = {} + callback_map = {} + timeout_map = {} + condition_map = {} for router in self.ordered_routers: - router_matches = router.matching_routes(event, event_time) - if router_matches: - route_map[router] = router_matches + matches, timeouts, conditions = router.matching_routes(event, event_time) + if matches: + callback_map[router] = matches + timeout_map[router] = timeouts + condition_map[router] = conditions - return route_map + return callback_map, timeout_map, condition_map - def wait_on_callbacks(self, callbacks, event: E, *args, **kwargs): + def wait_on_event_callbacks( + self, + event : E, + callbacks : dict[Router, list[Callable]], + timeouts : dict[Router, list[int | float | None]] | None = None, + conditions : dict[Router, list[FrameDirective | None]] | None = None, + ): #, *args, **kwargs): ''' - Note: relies on order of callbacks dict matching that of ``ordered_routers``, which - should happen in ``matching_routes`` + Returns a dictionary mapping from + + Note: relies on order of callback-associated dicts matching that of + ``ordered_routers``, which should happen in ``matching_routes``. + + This method blurs the OOP lines a bit, as we're actually passing dicts rather than + the lists expected by the super class. The parent ``submit_event`` is carefully + designed to be indifferent; use caution when making changes. ''' - results = {} + if timeouts is not None: + timeouts = {} + + if conditions is not None: + conditions = {} + + futures = {} for router, callback_list in callbacks.items(): - router_results = router.submit_event(event, callbacks=callback_list) - results[router] = router_results + futures[router] = router.submit_event( + event, + callback_list, + timeouts=timeouts.get(router), + conditions=conditions.get(router), + ) - return results + return futures def queue_callbacks(self, event_idx, callbacks): for router, callback_list in callbacks.items(): @@ -601,6 +1000,12 @@ class ChainRouter[E: Event](Router[E]): router.extend_listener(listener) return listener + def shutdown(self): + super().shutdown() + + # for router in self.ordered_routers: + # router.shutdown() + # RouterBuilder def route(router, route_group, **route_kwargs): diff --git a/execlog/syncers/router.py b/execlog/syncers/router.py index bbeba96..1708360 100644 --- a/execlog/syncers/router.py +++ b/execlog/syncers/router.py @@ -1,4 +1,5 @@ import logging +import concurrent from pathlib import Path from concurrent.futures import as_completed @@ -10,6 +11,7 @@ from co3.resources import DiskResource from co3 import Differ, Syncer, Database from execlog.event import Event +from execlog.router import CancelledFrameError from execlog.routers import PathRouter from execlog.util.generic import color_text @@ -146,20 +148,44 @@ class PathRouterSyncer(Syncer[Path]): # note: we structure this future waiting like this for the TQDM view results = [] - for future in tqdm( - as_completed(event_futures), + success = 0 + cancelled = 0 + errored = 0 + submitted = len(event_futures) + progress_bar = tqdm( total=len(event_futures), desc=f'Awaiting chunk futures [submitted {len(event_futures)}]' - ): + ) + + for future in as_completed(event_futures): try: - if not future.cancelled(): - results.append(future.result()) + callback_future_list = future.result() + results.append(callback_future_list) + success += 1 + except concurrent.futures.CancelledError as e: + cancelled += 1 + #logger.error(f'Event future cancelled; "{e}"') + except CancelledFrameError as e: + errored += 1 + pass + #logger.error(f'Event frame cancelled; "{e}"') except Exception as e: - logger.warning(f"Sync job failed with exception {e}") + errored += 1 + logger.warning(f'Sync job failed with unknown exception "{e}"') + + suc_txt = color_text(f'{success}', Fore.GREEN) + can_txt = color_text(f'{cancelled}', Fore.YELLOW) + err_txt = color_text(f'{errored}', Fore.RED) + tot_txt = color_text(f'{success+cancelled+errored}', Style.BRIGHT) + progress_bar.set_description( + f'Awaiting chunk futures [{tot_txt} / {submitted} | {suc_txt} {can_txt} {err_txt}]' + ) + progress_bar.update(n=1) + + progress_bar.close() return results def shutdown(self): super().shutdown() self.router.shutdown() - diff --git a/execlog/util/generic.py b/execlog/util/generic.py index e135219..400c42c 100644 --- a/execlog/util/generic.py +++ b/execlog/util/generic.py @@ -8,6 +8,13 @@ from colorama import Fore, Back, Style def color_text(text, *colorama_args): return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}" +def get_func_name(func): + func_name = str(func) + if hasattr(func, '__name__'): + func_name = func.__name__ + + return func_name + class ColorFormatter(logging.Formatter): _format = '%(levelname)-8s :: %(name)s %(message)s' diff --git a/tests/test_router.py b/tests/test_router.py index 30926c0..9b7d9b0 100644 --- a/tests/test_router.py +++ b/tests/test_router.py @@ -1,6 +1,7 @@ import logging from pathlib import Path from functools import partial +from concurrent.futures import wait from execlog import util from execlog import ChainRouter, Event @@ -41,7 +42,7 @@ def test_single_router_submission(): Event(endpoint='endpoint_proxy', name='file3'), ] futures = router2.submit(events) - router2.wait_on_futures(futures) + wait(futures) assert True @@ -52,7 +53,7 @@ def test_chain_router_submission(): Event(endpoint='endpoint_proxy', name='file3'), ] futures = chain_router.submit(events) - chain_router.wait_on_futures(futures) + wait(futures) assert True diff --git a/tests/test_server.py b/tests/test_server.py index 4137def..9794331 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -94,7 +94,8 @@ def test_server_with_listeners(): server.shutdown() thread.join() - # finally check the router event logs - assert router1.event_log[0][1] == ['router1 job success'] - assert router2.event_log[0][1] == ['router2 job success'] + # finally check the router event logs: holds tier-I futures, which hold lists of + # tier-II futures + assert [r.result() for r in router1.event_log[0][1].result()] == ['router1 job success'] + assert [r.result() for r in router2.event_log[0][1].result()] == ['router2 job success']