add callback timeouts, secondary thread pools, and frame directives to Router
This commit is contained in:
		
							parent
							
								
									f9de5cfe4a
								
							
						
					
					
						commit
						5510062148
					
				
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -1,7 +1,7 @@ | |||||||
| # generic py | # generic py | ||||||
| __pycache__/ | __pycache__/ | ||||||
| .pytest_cache/ | .pytest_cache/ | ||||||
| localsys.egg-info/ | *.egg-info/ | ||||||
| .ipynb_checkpoints/ | .ipynb_checkpoints/ | ||||||
| .python-version | .python-version | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -7,6 +7,8 @@ import logging | |||||||
| import inspect | import inspect | ||||||
| import traceback | import traceback | ||||||
| import threading | import threading | ||||||
|  | import concurrent  | ||||||
|  | from enum import Enum | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from typing import Any, Callable | from typing import Any, Callable | ||||||
| from collections import defaultdict | from collections import defaultdict | ||||||
| @ -18,11 +20,24 @@ from tqdm.auto import tqdm | |||||||
| 
 | 
 | ||||||
| from execlog.event import Event | from execlog.event import Event | ||||||
| from execlog.listener import Listener | from execlog.listener import Listener | ||||||
| from execlog.util.generic import color_text | from execlog.util.generic import color_text, get_func_name | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
| 
 | 
 | ||||||
|  | class FrameDirective(Enum): | ||||||
|  |     ''' | ||||||
|  |     Indicates frame-level behavior when a callback fails. | ||||||
|  |     ''' | ||||||
|  |     CONTINUE_WITHOUT = 1 | ||||||
|  |     CANCEL_FRAME = 2 | ||||||
|  | 
 | ||||||
|  | class CallbackTimeoutError(Exception): | ||||||
|  |     ... | ||||||
|  | 
 | ||||||
|  | class CancelledFrameError(Exception): | ||||||
|  |     ... | ||||||
|  | 
 | ||||||
| class Router[E: Event]: | class Router[E: Event]: | ||||||
|     ''' |     ''' | ||||||
|     Route events to registered callbacks |     Route events to registered callbacks | ||||||
| @ -78,6 +93,89 @@ class Router[E: Event]: | |||||||
|         completely for an event to be fully handled, later jobs may complete before |         completely for an event to be fully handled, later jobs may complete before | ||||||
|         earlier ones, or interact with intermediate disk states (raw file writes, DB |         earlier ones, or interact with intermediate disk states (raw file writes, DB | ||||||
|         inserts, etc), before the earliest call has had a chance to clean up. |         inserts, etc), before the earliest call has had a chance to clean up. | ||||||
|  | 
 | ||||||
|  |     .. admonition:: Details behind the threaded model and future management | ||||||
|  | 
 | ||||||
|  |         Routers kick off execution in response to *events*. These events are received via | ||||||
|  |         ``.submit``, and the following process is kick-started: | ||||||
|  | 
 | ||||||
|  |         1. Each event is wrapped in its own ``.submit_event`` call and submitted as a task | ||||||
|  |            to the *primary* thread pool. Let $E$ be the set of events, and $|E|$ be the | ||||||
|  |            number of events. ``.submit`` exits as soon as these $|E|$ tasks are enqueued, | ||||||
|  |            not waiting for the completion of the corresponding worker threads. There are | ||||||
|  |            now $|E|$ tier-I tasks waiting to be started by the router's primary thread | ||||||
|  |            pool, with states set to "pending." | ||||||
|  |         2. The primary thread pool begins running the enqueued ``.submit_event`` calls | ||||||
|  |            concurrently using allocated resources (e.g., four threads). Each | ||||||
|  |            ``.submit_event`` call matches the associated event $e$ to $c_e$ callbacks, | ||||||
|  |            according to the registered routes. These callbacks are each individually | ||||||
|  |            submitted to the *secondary* thread pool as tier-II tasks and waited upon | ||||||
|  |            *within* the ``.submit_event`` call. This thread pool separation prevents | ||||||
|  |            deadlocks which would otherwise be an issue if submitting both tier-I and | ||||||
|  |            tier-II tasks to the same thread pool. Tier-I tasks that have begun this | ||||||
|  |            process are in the "running" state, and the submitted tier-II futures are now | ||||||
|  |            "pending." | ||||||
|  |         3. Once the $c_e$ callbacks for event $e$ are completed (which are tier-II tasks | ||||||
|  |            being waited upon within a tier-I task), their done-callbacks will be invoked | ||||||
|  |            within "a thread belonging to the process that added them" (with | ||||||
|  |            ``wait_on_event_callbacks`` calling through to ``submit_callback``, which | ||||||
|  |            attaches ``general_task_done``). Where these done callbacks are executed | ||||||
|  |            varies based on a few conditions. See | ||||||
|  | 
 | ||||||
|  |            https://stackoverflow.com/a/26021772/4573915 | ||||||
|  | 
 | ||||||
|  |            for a great breakdown on this. The gist is the following:  | ||||||
|  | 
 | ||||||
|  |            a. If the callback is attached to a future that is already cancelled or | ||||||
|  |               completed, it will be invoked immediately in the current thread doing the | ||||||
|  |               attaching. | ||||||
|  |            b. If the future is queued/pending and successfully cancelled, the thread | ||||||
|  |               doing the cancelling will immediately invoke all of the future's callbacks. | ||||||
|  |            c. Otherwise, the thread that executes the future's task (could produce either | ||||||
|  |               a successful result or an exception) will invoke the callbacks. | ||||||
|  | 
 | ||||||
|  |            So if a task completes (i.e., is not cancelled, producing either a result or an | ||||||
|  |            exception), the thread that ran the task will also handle the associated | ||||||
|  |            callbacks. If the task is successfully cancelled (which means it was never | ||||||
|  |            running and never allocated a thread), the cancelling context will handle the | ||||||
|  |            callbacks, and this happens here only in ``.shutdown()`` with the call | ||||||
|  |            ``thread_pool.shutdown(cancel_futures=True)``. | ||||||
|  | 
 | ||||||
|  |            The results of these futures are made available in this ``submit_event`` | ||||||
|  |            context. Note that these results are dictated by the logic in | ||||||
|  |            ``wait_on_futures``. If a future was successfully cancelled or raised and | ||||||
|  |            exception during execution, it will not have a result to add to this list. | ||||||
|  | 
 | ||||||
|  |            The *router-level* post-callbacks are then submitted to the secondary thread | ||||||
|  |            pool and awaited in a similar fashion to the individual $c_e$ callbacks. The | ||||||
|  |            results obtained from the event callbacks are passed through to these | ||||||
|  |            "post-callbacks" for possible centralized processing. | ||||||
|  |         4. Once all post-callbacks have completed (along with *their* attached | ||||||
|  |            "done-callbacks," which are just ``.general_task_done`` checks handled in the | ||||||
|  |            executor thread that ran the post-callback), finally the tier-I | ||||||
|  |            ``.submit_event`` future can be marked completed (either with a successfully | ||||||
|  |            attached result or an exception), and its attached "done-callbacks" will be | ||||||
|  |            ran the in the same tier-I thread that handled the task (which is | ||||||
|  |            ``general_task_done`` and ``clear_event``, in that order). | ||||||
|  | 
 | ||||||
|  |         - General behaviors and additional remarks: | ||||||
|  |           * Thread pool futures can only be cancelled prior to "running" or "done" states. | ||||||
|  |             Both successful cancellation or completion trigger a future's done-callbacks, | ||||||
|  |             which will be executed in one a few possible contexts depending several | ||||||
|  |             conditions, as detailed above. | ||||||
|  |           * Tier-I tasks have ``clear_event`` attached as a done-callback, which tracks | ||||||
|  |             the task result and resubmits the event if valid (successfully debounced) | ||||||
|  |             repeat requests were received while the event has been handled. | ||||||
|  |           * Both tier-I and tier-II callbacks have a ``.general_task_done`` callback, which | ||||||
|  |             attempts to retrieve the future result if it wasn't cancelled (if it was, this | ||||||
|  |             retrieval would raise a ``CancelledError``). If it wasn't cancelled but an | ||||||
|  |             exception was raised during execution, this same exception will be re-raised | ||||||
|  |             and re-caught, logged as an error, and exit "cleanly" (since job failures | ||||||
|  |             shouldn't throw off the entire process). A successful result retrieval will | ||||||
|  |             have no effect. | ||||||
|  | 
 | ||||||
|  |         - On interrupts, exception handling, and future cancellation: | ||||||
|  |           *  | ||||||
|     ''' |     ''' | ||||||
|     listener_cls = Listener[E] |     listener_cls = Listener[E] | ||||||
| 
 | 
 | ||||||
| @ -107,6 +205,7 @@ class Router[E: Event]: | |||||||
| 
 | 
 | ||||||
|         # store prepped (e.g., delayed) callbacks |         # store prepped (e.g., delayed) callbacks | ||||||
|         self.callback_registry = {} |         self.callback_registry = {} | ||||||
|  |         self.callback_start_times = {} | ||||||
| 
 | 
 | ||||||
|         # track event history |         # track event history | ||||||
|         self.event_log = [] |         self.event_log = [] | ||||||
| @ -115,14 +214,23 @@ class Router[E: Event]: | |||||||
|         self.should_exit = False |         self.should_exit = False | ||||||
|         self._active_futures = set() |         self._active_futures = set() | ||||||
| 
 | 
 | ||||||
|         self._thread_pool = None |         self._thread_pool_1 = None | ||||||
|  |         self._thread_pool_2 = None | ||||||
|         self._route_lock  = threading.Lock() |         self._route_lock  = threading.Lock() | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def thread_pool(self): |     def primary_thread_pool(self): | ||||||
|         if self._thread_pool is None: |         '''Handle tier-I futures.''' | ||||||
|             self._thread_pool = ThreadPoolExecutor(max_workers=self.workers) |         if self._thread_pool_1 is None: | ||||||
|         return self._thread_pool |             self._thread_pool_1 = ThreadPoolExecutor(max_workers=self.workers) | ||||||
|  |         return self._thread_pool_1 | ||||||
|  |      | ||||||
|  |     @property | ||||||
|  |     def secondary_thread_pool(self): | ||||||
|  |         '''Handle tier-II futures.''' | ||||||
|  |         if self._thread_pool_2 is None: | ||||||
|  |             self._thread_pool_2 = ThreadPoolExecutor(max_workers=self.workers) | ||||||
|  |         return self._thread_pool_2 | ||||||
| 
 | 
 | ||||||
|     def register( |     def register( | ||||||
|         self, |         self, | ||||||
| @ -131,6 +239,8 @@ class Router[E: Event]: | |||||||
|         pattern, |         pattern, | ||||||
|         debounce=200, |         debounce=200, | ||||||
|         delay=10, |         delay=10, | ||||||
|  |         callback_timeout=None, | ||||||
|  |         condition=FrameDirective.CONTINUE_WITHOUT, | ||||||
|         **listener_kwargs, |         **listener_kwargs, | ||||||
|     ): |     ): | ||||||
|         ''' |         ''' | ||||||
| @ -157,8 +267,17 @@ class Router[E: Event]: | |||||||
|                      ``filter(...)``) |                      ``filter(...)``) | ||||||
|             debounce: |             debounce: | ||||||
|             delay: |             delay: | ||||||
|  |             callback_timeout: timeout for waiting  | ||||||
|         ''' |         ''' | ||||||
|         route_tuple = (callback, pattern, debounce, delay, listener_kwargs) |         route_tuple = ( | ||||||
|  |             callback, | ||||||
|  |             pattern, | ||||||
|  |             debounce, | ||||||
|  |             delay, | ||||||
|  |             callback_timeout, | ||||||
|  |             condition, | ||||||
|  |             listener_kwargs | ||||||
|  |         ) | ||||||
|         self.routemap[endpoint].append(route_tuple) |         self.routemap[endpoint].append(route_tuple) | ||||||
| 
 | 
 | ||||||
|     def submit(self, events: E | list[E], callbacks: list[Callable] | None = None): |     def submit(self, events: E | list[E], callbacks: list[Callable] | None = None): | ||||||
| @ -178,7 +297,13 @@ class Router[E: Event]: | |||||||
| 
 | 
 | ||||||
|         return futures |         return futures | ||||||
| 
 | 
 | ||||||
|     def submit_event(self, event: E, callbacks: list[Callable] | None = None): |     def submit_event( | ||||||
|  |         self, | ||||||
|  |         event      : E, | ||||||
|  |         callbacks  : list[Callable] | None       = None, | ||||||
|  |         timeouts   : list[int|float] | None      = None, | ||||||
|  |         conditions : list[FrameDirective] | None = None, | ||||||
|  |     ) -> list: | ||||||
|         ''' |         ''' | ||||||
|         Group up and submit all matching callbacks for ``event``. All callbacks are ran |         Group up and submit all matching callbacks for ``event``. All callbacks are ran | ||||||
|         concurrently in their own threads, and this method blocks until all are completed. |         concurrently in their own threads, and this method blocks until all are completed. | ||||||
| @ -198,7 +323,7 @@ class Router[E: Event]: | |||||||
|             # this may be split across threads mid-check, preventing one thread from |             # this may be split across threads mid-check, preventing one thread from | ||||||
|             # handling the blocking of the entire group |             # handling the blocking of the entire group | ||||||
|             with self._route_lock: |             with self._route_lock: | ||||||
|                 callbacks = self.matching_routes(event) |                 callbacks, timeouts, conditions = self.matching_routes(event) | ||||||
| 
 | 
 | ||||||
|         # stop early if no work to do |         # stop early if no work to do | ||||||
|         if len(callbacks) == 0: |         if len(callbacks) == 0: | ||||||
| @ -210,34 +335,32 @@ class Router[E: Event]: | |||||||
|             self.queue_callbacks(event_idx, callbacks) |             self.queue_callbacks(event_idx, callbacks) | ||||||
|             return [] |             return [] | ||||||
| 
 | 
 | ||||||
|  |         # TODO: Chesterton's fence | ||||||
|         # callbacks now computed, flush the running event |         # callbacks now computed, flush the running event | ||||||
|         # note: a separate thread could queue valid callbacks since the running check; |         # note: a separate thread could queue valid callbacks since the running check; | ||||||
|         # o/w we know the running index is empty |         # o/w we know the running index is empty | ||||||
|         self.running_events[event_idx] = self.running_events[event_idx] |         # self.running_events[event_idx] = self.running_events[event_idx] | ||||||
| 
 | 
 | ||||||
|         # submit matching callbacks and wait for them to complete |         # submit matching callbacks and wait for them to complete | ||||||
|         future_results = self.wait_on_callbacks(callbacks, event) |         # *may* raise a FrameCancelledError, which we let propagate upward | ||||||
|  |         completed_futures = self.wait_on_event_callbacks( | ||||||
|  |             event, | ||||||
|  |             callbacks, | ||||||
|  |             timeouts=timeouts, | ||||||
|  |             conditions=conditions, | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         # finally call post event-group callbacks (only if some event callbacks were |         # finally call post event-group callbacks (only if some event callbacks were | ||||||
|         # submitted), wait for them to complete |         # submitted), wait for them to complete | ||||||
|         if future_results: |         if completed_futures: | ||||||
|             self.wait_on_futures([ |             wait([ | ||||||
|                 self.submit_callback(post_callback, event, future_results) |                 self.submit_event_callback(post_callback, event, completed_futures)[0] | ||||||
|                 for post_callback in self.post_callbacks |                 for post_callback in self.post_callbacks | ||||||
|             ]) |             ]) | ||||||
| 
 | 
 | ||||||
|         return future_results |         return completed_futures | ||||||
| 
 |  | ||||||
|     def submit_callback(self, callback: Callable, *args, **kwargs): |  | ||||||
|         ''' |  | ||||||
|         Note: this method is expected to return a future. Perform any event-based |  | ||||||
|         filtering before submitting a callback with this method. |  | ||||||
|         ''' |  | ||||||
|         # exit immediately if exit flag is set |  | ||||||
|         # if self.should_exit: |  | ||||||
|         #     return |  | ||||||
|         callback = self.wrap_safe_callback(callback) |  | ||||||
| 
 | 
 | ||||||
|  |     def _submit_with_thread_pool(self, thread_pool, callback: Callable, *args, **kwargs): | ||||||
|         if inspect.iscoroutinefunction(callback): |         if inspect.iscoroutinefunction(callback): | ||||||
|             if self.loop is None: |             if self.loop is None: | ||||||
|                 self.loop = asyncio.new_event_loop() |                 self.loop = asyncio.new_event_loop() | ||||||
| @ -251,15 +374,48 @@ class Router[E: Event]: | |||||||
|                 self.loop, |                 self.loop, | ||||||
|             ) |             ) | ||||||
|         else: |         else: | ||||||
|             future = self.thread_pool.submit( |             future = thread_pool.submit( | ||||||
|                 callback, *args, **kwargs |                 callback, *args, **kwargs | ||||||
|             ) |             ) | ||||||
|             self._active_futures.add(future) |  | ||||||
|             future.add_done_callback(self.general_task_done) |  | ||||||
| 
 | 
 | ||||||
|         return future |         return future | ||||||
| 
 | 
 | ||||||
|     def matching_routes(self, event: E, event_time=None): |     def submit_callback(self, callback: Callable, *args, **kwargs): | ||||||
|  |         future = self._submit_with_thread_pool( | ||||||
|  |             self.primary_thread_pool, | ||||||
|  |             callback, | ||||||
|  |             *args, **kwargs | ||||||
|  |         ) | ||||||
|  |         self._active_futures.add(future) | ||||||
|  | 
 | ||||||
|  |         return future | ||||||
|  | 
 | ||||||
|  |     def submit_event_callback(self, callback: Callable, event: E, *args, **kwargs): | ||||||
|  |         ''' | ||||||
|  |         Note: this method is expected to return a future. Perform any event-based | ||||||
|  |         filtering before submitting a callback with this method. | ||||||
|  |         ''' | ||||||
|  |         # exit immediately if exit flag is set | ||||||
|  |         # if self.should_exit: | ||||||
|  |         #     return | ||||||
|  |         submitted_time = time.time() | ||||||
|  |         callback = self.wrap_timed_callback(callback, submitted_time) | ||||||
|  | 
 | ||||||
|  |         future = self._submit_with_thread_pool( | ||||||
|  |             self.secondary_thread_pool, | ||||||
|  |             callback, | ||||||
|  |             event, | ||||||
|  |             *args, **kwargs | ||||||
|  |         ) | ||||||
|  |         future.add_done_callback(self.general_callback) | ||||||
|  | 
 | ||||||
|  |         return future, submitted_time | ||||||
|  | 
 | ||||||
|  |     def matching_routes( | ||||||
|  |         self, | ||||||
|  |         event: E, | ||||||
|  |         event_time = None | ||||||
|  |     ) -> tuple[list[Callable], list[int|float]]: | ||||||
|         ''' |         ''' | ||||||
|         Return eligible matching routes for the provided event. |         Return eligible matching routes for the provided event. | ||||||
| 
 | 
 | ||||||
| @ -292,12 +448,14 @@ class Router[E: Event]: | |||||||
|             debounce on the first matching action. |             debounce on the first matching action. | ||||||
|         ''' |         ''' | ||||||
|         matches    = [] |         matches    = [] | ||||||
|  |         timeouts   = [] | ||||||
|  |         conditions = [] | ||||||
|         endpoint   = event.endpoint |         endpoint   = event.endpoint | ||||||
|         name       = event.name |         name       = event.name | ||||||
|         #action     = tuple(event.action) # should be more general |         #action     = tuple(event.action) # should be more general | ||||||
|         event_time = time.time()*1000 if event_time is None else event_time |         event_time = time.time()*1000 if event_time is None else event_time | ||||||
| 
 | 
 | ||||||
|         for (callback, pattern, debounce, delay, listen_kwargs) in self.routemap[endpoint]: |         for (callback, pattern, debounce, delay, ctimeout, condition, listen_kwargs) in self.routemap[endpoint]: | ||||||
|             #index = (endpoint, name, action, callback, pattern, debounce, delay) |             #index = (endpoint, name, action, callback, pattern, debounce, delay) | ||||||
|             index = (endpoint, name, callback, pattern, debounce, delay) |             index = (endpoint, name, callback, pattern, debounce, delay) | ||||||
| 
 | 
 | ||||||
| @ -305,9 +463,7 @@ class Router[E: Event]: | |||||||
|                 # reject event |                 # reject event | ||||||
|                 continue |                 continue | ||||||
| 
 | 
 | ||||||
|             callback_name = str(callback) |             callback_name = get_func_name(callback) | ||||||
|             if hasattr(callback, '__name__'): |  | ||||||
|                 callback_name = callback.__name__ |  | ||||||
| 
 | 
 | ||||||
|             name_text     = color_text(name,               Fore.BLUE) |             name_text     = color_text(name,               Fore.BLUE) | ||||||
|             pattern_text  = color_text(pattern,            Fore.BLUE) |             pattern_text  = color_text(pattern,            Fore.BLUE) | ||||||
| @ -317,6 +473,8 @@ class Router[E: Event]: | |||||||
|             if self.filter(event, pattern, **listen_kwargs): |             if self.filter(event, pattern, **listen_kwargs): | ||||||
|                 # note that delayed callbacks are added |                 # note that delayed callbacks are added | ||||||
|                 matches.append(self.get_delayed_callback(callback, delay, index)) |                 matches.append(self.get_delayed_callback(callback, delay, index)) | ||||||
|  |                 timeouts.append(ctimeout) | ||||||
|  |                 conditions.append(condition) | ||||||
| 
 | 
 | ||||||
|                 # set next debounce  |                 # set next debounce  | ||||||
|                 self.next_allowed_time[index] = event_time + debounce |                 self.next_allowed_time[index] = event_time + debounce | ||||||
| @ -331,7 +489,7 @@ class Router[E: Event]: | |||||||
|                     f'Event [{name_text}] {match_text} against [{pattern_text}] under [{endpoint_text}] for [{callback_text}]' |                     f'Event [{name_text}] {match_text} against [{pattern_text}] under [{endpoint_text}] for [{callback_text}]' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|         return matches |         return matches, timeouts, conditions | ||||||
| 
 | 
 | ||||||
|     def get_delayed_callback(self, callback: Callable, delay: int|float, index): |     def get_delayed_callback(self, callback: Callable, delay: int|float, index): | ||||||
|         ''' |         ''' | ||||||
| @ -356,29 +514,195 @@ class Router[E: Event]: | |||||||
| 
 | 
 | ||||||
|         return self.callback_registry[index] |         return self.callback_registry[index] | ||||||
| 
 | 
 | ||||||
|     def wait_on_futures(self, futures): |     def wait_on_event_callbacks( | ||||||
|  |         self, | ||||||
|  |         event      : E, | ||||||
|  |         callbacks  : list[Callable], | ||||||
|  |         timeouts   : list[int | float | None] | None    = None, | ||||||
|  |         conditions : list[FrameDirective | None] | None = None, | ||||||
|  |     ): #, *args, **kwargs): | ||||||
|         ''' |         ''' | ||||||
|         Block until all futures in ``futures`` are complete. Return collected results as a |         Waits for event-associated callbacks to complete. | ||||||
|         list, and log warnings when a future fails. |  | ||||||
|         ''' |  | ||||||
|         future_results = [] |  | ||||||
|         for future in as_completed(futures): |  | ||||||
|             try: |  | ||||||
|                 if not future.cancelled(): |  | ||||||
|                     future_results.append(future.result()) |  | ||||||
|             except Exception as e: |  | ||||||
|                 logger.warning(f"Router callback job failed with exception \"{e}\"") |  | ||||||
| 
 | 
 | ||||||
|         return future_results |         Submits each callback in ``callbacks`` to the thread pool (via | ||||||
|  |         ``submit_callback``), passing ``event`` as the only parameter to each. Once | ||||||
|  |         started, the future for callback ``callbacks[i]`` will have ``timeouts[i]`` | ||||||
|  |         seconds to complete. If it has not completed in this time, the future's result is | ||||||
|  |         set to the Timeout exception | ||||||
| 
 | 
 | ||||||
|     def wait_on_callbacks(self, callbacks: list[Callable], event: E, *args, **kwargs): |  | ||||||
|         ''' |  | ||||||
|         Overridable by inheriting classes based on callback structure |         Overridable by inheriting classes based on callback structure | ||||||
|         ''' |         ''' | ||||||
|         return self.wait_on_futures([ |         if timeouts is None: | ||||||
|             self.submit_callback(callback, event, *args, **kwargs) |             timeouts = [None]*len(callbacks) | ||||||
|             for callback in callbacks | 
 | ||||||
|         ]) |         if conditions is None: | ||||||
|  |             conditions = [FrameDirective.CONTINUE_WITHOUT]*len(callbacks) | ||||||
|  | 
 | ||||||
|  |         future_map = {} | ||||||
|  |         timed_futures = set() | ||||||
|  |         for callback, timeout, on_err in zip(callbacks, timeouts, conditions): | ||||||
|  |             # "raw" callback here is the reference that will be indexed in `.callback_start_times` | ||||||
|  |             future, submitted_time = self.submit_event_callback(callback, event) #*args, **kwargs) | ||||||
|  |             future_map[future] = ( | ||||||
|  |                 callback, | ||||||
|  |                 get_func_name(callback), | ||||||
|  |                 future, | ||||||
|  |                 submitted_time, | ||||||
|  |                 timeout, | ||||||
|  |                 on_err | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             if timeout is not None: | ||||||
|  |                 timed_futures.add(future) | ||||||
|  | 
 | ||||||
|  |         completed_futures = [] | ||||||
|  |         # iterate while there are some futures not completed (cancelled, or finished w/ | ||||||
|  |         # exception or return value). When completed, a future is removed from `futures`. | ||||||
|  |         while future_map: | ||||||
|  |             min_timeout = float('inf') | ||||||
|  |             expired_futures = [] # actively running but expired | ||||||
|  | 
 | ||||||
|  |             if timed_futures: | ||||||
|  |                 time_now = time.time() | ||||||
|  |                 for future in list(timed_futures): | ||||||
|  |                     callback, cback_name, future, submitted_time, timeout, on_err = future_map[future] | ||||||
|  |                     future_tuple = (future, cback_name, on_err) | ||||||
|  |                     callback_id = (callback, submitted_time) | ||||||
|  | 
 | ||||||
|  |                     if future.done(): | ||||||
|  |                         timed_futures.remove(future) | ||||||
|  |                         continue | ||||||
|  | 
 | ||||||
|  |                     if future.running() and callback_id in self.callback_start_times: | ||||||
|  |                         time_running = time_now - self.callback_start_times[callback_id] | ||||||
|  |                         time_left = timeout - time_running | ||||||
|  | 
 | ||||||
|  |                         # track running futures that have timed out | ||||||
|  |                         if time_left < 0: | ||||||
|  |                             expired_futures.append(future_tuple) | ||||||
|  |                             continue | ||||||
|  |                     else: | ||||||
|  |                         # running w/o a start time, or queued. Set time-left to the timeout | ||||||
|  |                         time_left = timeout | ||||||
|  | 
 | ||||||
|  |                     # track running future w/ smallest non-zero time left before timeout | ||||||
|  |                     min_timeout = min(min_timeout, time_left) | ||||||
|  | 
 | ||||||
|  |             done_futures = []  | ||||||
|  |             for future in list(future_map.keys()): | ||||||
|  |                 _, cback_name, future, _, _, on_err = future_map[future] | ||||||
|  |                 future_tuple = (future, cback_name, on_err) | ||||||
|  | 
 | ||||||
|  |                 if future.done(): | ||||||
|  |                     # done futures have a set result or exception and now immutable | ||||||
|  |                     done_futures.append(future_tuple) # local | ||||||
|  |                     completed_futures.append(future) # global | ||||||
|  |                     future_map.pop(future) | ||||||
|  | 
 | ||||||
|  |             done_with_exception = [ | ||||||
|  |                 ftuple | ||||||
|  |                 for ftuple in done_futures | ||||||
|  |                 if ftuple[0].exception() is not None | ||||||
|  |             ] | ||||||
|  |             frame_cancellable = expired_futures + done_with_exception | ||||||
|  | 
 | ||||||
|  |             cancel_frame = False | ||||||
|  |             # set a timeout exception for all expired futures, and determine | ||||||
|  |             # if any of these timeouts should result in a frame cancellation | ||||||
|  |             for _, _, on_err in frame_cancellable: | ||||||
|  |                 if on_err == FrameDirective.CANCEL_FRAME: | ||||||
|  |                     cancel_frame = True | ||||||
|  |                     break | ||||||
|  | 
 | ||||||
|  |             # set exceptions on running expired futures | ||||||
|  |             for expired_future, cback_name, _ in expired_futures: | ||||||
|  |                 # results are immutable once set; double check doneness | ||||||
|  |                 if not expired_future.done(): | ||||||
|  |                     expired_future.set_exception( | ||||||
|  |                         CallbackTimeoutError( | ||||||
|  |                             f'Event callback {cback_name} timed out with condition {on_err}' | ||||||
|  |                         ) | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |             if cancel_frame: | ||||||
|  |                 # cancel the active frame. Expired futures have already been handled, and | ||||||
|  |                 # we now need to handle running futures (not cancellable) and pending | ||||||
|  |                 # futures (cancellable) | ||||||
|  |   | ||||||
|  |                 # explicitly cancel queued futures before looping through active futures | ||||||
|  |                 for future in list(future_map.keys()): | ||||||
|  |                     # cancellable futures not yet started | ||||||
|  |                     # can't really be more specific here, sets a CancelledError  | ||||||
|  |                     future.cancel() | ||||||
|  | 
 | ||||||
|  |                 for future in list(future_map.keys()): | ||||||
|  |                     _, cback_name, future, _, _, _ = future_map[future] | ||||||
|  | 
 | ||||||
|  |                     if future.done(): | ||||||
|  |                         # includes the expired futures whose exceptions were just set & | ||||||
|  |                         # those cancelled, although managing `future_map` isn't needed at | ||||||
|  |                         # this stage | ||||||
|  |                         future_map.pop(future) | ||||||
|  |                     elif future.running(): | ||||||
|  |                         # possible memory leak | ||||||
|  |                         future.set_exception( | ||||||
|  |                             CancelledFrameError( | ||||||
|  |                                 f'Indirect frame cancellation for event callback {cback_name}' | ||||||
|  |                             ) | ||||||
|  |                         ) | ||||||
|  |                         # attempt to communicate with threaded processes for graceful shutdown | ||||||
|  |                         # -> set shutdown flags | ||||||
|  |                         ... | ||||||
|  | 
 | ||||||
|  |                 # finally, raise an exception indicating the frame was cancelled | ||||||
|  |                 raise CancelledFrameError | ||||||
|  | 
 | ||||||
|  |             # if no expired futures (or they can be ignored with CONTINUE_WITHOUT), carry | ||||||
|  |             # on. Wait for the future with the least time remaining; we shouldn't need to | ||||||
|  |             # do any of the above future parsing until at least min-timeout more seconds. | ||||||
|  |             timeout = None | ||||||
|  |             if min_timeout < float('inf'): | ||||||
|  |                 timeout = min_timeout | ||||||
|  | 
 | ||||||
|  |             # if any remaining futures can produce CANCELLED FRAMEs, wait only on them. | ||||||
|  |             # This lets us be maximally dynamic:  | ||||||
|  |             # -> We respond before the timeout only if the newly completed might CANCEL | ||||||
|  |             #    the FRAME, which we want to do as soon as possible. | ||||||
|  |             # -> If not responding early, we wait at most `timeout` seconds to be back and | ||||||
|  |             #    checking if the *earliest possible timeout violation* has occurred. If | ||||||
|  |             #    so, we want to handle it immediately no matter what so I can set the | ||||||
|  |             #    desired Timeout exception as *early as possible*. If I don't, the future | ||||||
|  |             #    might finish and set its own result, shutting me out when I would've | ||||||
|  |             #    liked to capture the timeout violation. | ||||||
|  |             # -> When CANCEL FRAME futures are available, I don't wait on all pending | ||||||
|  |             #    futures as I *don't mind* being late to them. I only need to check | ||||||
|  |             #    immediately on CANCEL FRAMEs, and otherwise be back no later than | ||||||
|  |             #    `timeout`. If there's no `timeout` I'll wait for the first CANCEL FRAME, | ||||||
|  |             #    if no CANCEL FRAME I'll wait up to timeout on all futures, and if neither | ||||||
|  |             #    I'll simply wait until all futures complete. | ||||||
|  |             cancel_frame_futures = [ | ||||||
|  |                 future | ||||||
|  |                 for future, ftuple in future_map.items() | ||||||
|  |                 if ftuple[-1] == FrameDirective.CANCEL_FRAME | ||||||
|  |             ] | ||||||
|  | 
 | ||||||
|  |             return_when = concurrent.futures.ALL_COMPLETED | ||||||
|  |             wait_futures = future_map.keys() | ||||||
|  |             if cancel_frame_futures: | ||||||
|  |                 return_when = concurrent.futures.FIRST_COMPLETED | ||||||
|  |                 wait_futures = cancel_frame_futures | ||||||
|  | 
 | ||||||
|  |             # if no active future has a timeout | ||||||
|  |             done, _ = wait( | ||||||
|  |                 wait_futures, | ||||||
|  |                 timeout=timeout, | ||||||
|  |                 return_when=return_when | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |         # add any trailing completed futures | ||||||
|  |         completed_futures.extend(done) | ||||||
|  | 
 | ||||||
|  |         return completed_futures | ||||||
| 
 | 
 | ||||||
|     def queue_callbacks(self, event_idx, callbacks: list[Callable]): |     def queue_callbacks(self, event_idx, callbacks: list[Callable]): | ||||||
|         ''' |         ''' | ||||||
| @ -386,7 +710,7 @@ class Router[E: Event]: | |||||||
|         ''' |         ''' | ||||||
|         self.running_events[event_idx].update(callbacks) |         self.running_events[event_idx].update(callbacks) | ||||||
| 
 | 
 | ||||||
|     def wrap_safe_callback(self, callback: Callable): |     def wrap_timed_callback(self, callback: Callable, submitted_time): | ||||||
|         ''' |         ''' | ||||||
|         Check for shutdown flag and exit before running the callbacks.  |         Check for shutdown flag and exit before running the callbacks.  | ||||||
| 
 | 
 | ||||||
| @ -394,9 +718,8 @@ class Router[E: Event]: | |||||||
|         an interrupt is received. |         an interrupt is received. | ||||||
|         ''' |         ''' | ||||||
|         def safe_callback(callback, *args, **kwargs): |         def safe_callback(callback, *args, **kwargs): | ||||||
|             if self.should_exit: |             # track when this task actually begins | ||||||
|                 logger.debug('Exiting early from queued callback') |             self.callback_start_times[(callback, submitted_time)] = time.time() | ||||||
|                 return |  | ||||||
| 
 | 
 | ||||||
|             return callback(*args, **kwargs)  |             return callback(*args, **kwargs)  | ||||||
| 
 | 
 | ||||||
| @ -448,31 +771,67 @@ class Router[E: Event]: | |||||||
| 
 | 
 | ||||||
|     def clear_event(self, event: E, future): |     def clear_event(self, event: E, future): | ||||||
|         ''' |         ''' | ||||||
|         Clear an event. Pops the passed event out of ``running_events``, and the request |         Clear an event. Pops the passed event out of ``running_events``, and if the | ||||||
|         counter is >0, the event is re-submitted. |         request counter is >0, the event is re-submitted. | ||||||
| 
 | 
 | ||||||
|         This method is attached as a "done" callback to the main event wrapping job |         This method is attached as a "done" callback to the main event wrapping job | ||||||
|         ``submit_event``. The ``future`` given to this method is one to which it was |         ``submit_event``. The ``future`` given to this method is one to which it was | ||||||
|         attached as this "done" callback. This method should only be called when that |         attached as this "done" callback. This method should only be called when that | ||||||
|         ``future`` is finished running (or failed). If any jobs were submitted in the |         ``future`` is finished running (or failed). If any jobs were submitted in the | ||||||
|         wrapper task, the future results here should be non-empty. We use this fact to |         wrapper task, the future results here should be non-empty (even if the methods | ||||||
|         filter out non-work threads that call this method. Because even the |         don't return anything; we'll at least have ``[None,...]`` if we scheduled at least | ||||||
|         ``matching_routes`` check is threaded, we can't wait to see an event has no work to |         one callback). We use this fact to filter out non-work threads that call this | ||||||
|         schedule, and thus can't prevent this method being attached as a "done" callback. |         method. Because even the ``matching_routes`` check is threaded, we can't wait to | ||||||
|         The check for results from the passed future allows us to know when in fact a |         see an event has no work to schedule, and thus can't prevent this method being | ||||||
|         valid frame has finished, and a resubmission may be on the table. |         attached as a "done" callback. The check for results from the passed future allows | ||||||
|  |         us to know when in fact a valid frame has finished, and a resubmission may be on | ||||||
|  |         the table. | ||||||
|  | 
 | ||||||
|  |         .. admonition:: Why we don't need to worry about resubmission w/ empty results | ||||||
|  | 
 | ||||||
|  |             Note that, even though we can't check for whether there will be any matching | ||||||
|  |             routes prior to calling ``submit_event`` (we have to wait until we're in that | ||||||
|  |             method, at which point this method will already be attached as a callback), | ||||||
|  |             the event will never be marked as "running" and added to ``running_events``. | ||||||
|  |             This means that we won't queue up any callbacks for the same event while | ||||||
|  |             waiting on it, and then exit early here, never to re-submit. The worry was | ||||||
|  |             that a event response might match 0 callbacks (e.g., due to debouncing) and | ||||||
|  |             return ``[]`` from ``submit_event``, but *while waiting for this to complete*, | ||||||
|  |             the same event is submitted and matches (e.g., debouncing timers now allowing | ||||||
|  |             the event through). This could mean that the repeat event gets queued behind | ||||||
|  |             the event in ``running_events`` and should be resubmitted here as a "queued | ||||||
|  |             callback," but *won't do so* because we exit early if no results are obtained. | ||||||
|  |             This is **not an issue** because the original event (that didn't match any | ||||||
|  |             callbacks) will never be marked as running, and thus never prevent the second | ||||||
|  |             event from itself running if in fact in matches a non-empty callback set. This | ||||||
|  |             means that an empty future result set seen here indicates both 1) that no work | ||||||
|  |             took place, and 2) no conflicting events were prevented from running, and we | ||||||
|  |             can exit early here. | ||||||
|         ''' |         ''' | ||||||
|         result = None |         self._active_futures.remove(future) | ||||||
|         if not future.cancelled(): |  | ||||||
|             result = future.result() |  | ||||||
|         else: |  | ||||||
|             return None |  | ||||||
| 
 | 
 | ||||||
|         # result should be *something* if work was scheduled |         # result should be *something* if work was scheduled, since `submit_event` wraps | ||||||
|         if not result: |         # up futures in a list. If no result, event was never marked active, and don't | ||||||
|             return None |         # want to resubmit as any duplicates were allowed to start. Attempt to get result, | ||||||
|  |         # returning if it's (None or []) or raised an Exception (possibly a | ||||||
|  |         # FrameCancelledError if there's a frame issue, for a CancelledError if the tier-I | ||||||
|  |         # task was successfully cancelled following a `.shutdown()` call) | ||||||
|  |         try: | ||||||
|  |             if not future.result(): return | ||||||
|  |         except concurrent.futures.CancelledError as e: | ||||||
|  |             #logger.error(f'Tier-I future cancelled') | ||||||
|  |             # do not re-raise; outer context can handle cancellations | ||||||
|  |             return | ||||||
|  |         except CancelledFrameError as e: | ||||||
|  |             # do not re-raise; outer context will manage frame cancellations | ||||||
|  |             return | ||||||
|  |         except Exception as e: | ||||||
|  |             # print traceback for unexpected exception | ||||||
|  |             logger.error(f'Unexpected exception in tier-I future: "{e}"') | ||||||
|  |             traceback.print_exc() | ||||||
|  |             return | ||||||
| 
 | 
 | ||||||
|         self.event_log.append((event, result)) |         self.event_log.append((event, future)) | ||||||
|         queued_callbacks = self.stop_event(event) |         queued_callbacks = self.stop_event(event) | ||||||
| 
 | 
 | ||||||
|         # resubmit event if some queued work remains |         # resubmit event if some queued work remains | ||||||
| @ -485,28 +844,38 @@ class Router[E: Event]: | |||||||
|     def event_index(self, event): |     def event_index(self, event): | ||||||
|         return event[:2] |         return event[:2] | ||||||
| 
 | 
 | ||||||
|  |     def general_callback(self, future): | ||||||
|  |         try: | ||||||
|  |             future.result() | ||||||
|  |         except concurrent.futures.CancelledError as e: | ||||||
|  |             logger.error(f'Tier-II future cancelled; "{e}"') | ||||||
|  |         except CancelledFrameError as e: | ||||||
|  |             logger.error(f'Tier-II frame cancelled; "{e}"') | ||||||
|  |         except Exception as e: | ||||||
|  |             logger.warning(f'Tier-II job failed with unknown exception "{e}"') | ||||||
|  | 
 | ||||||
|     def shutdown(self): |     def shutdown(self): | ||||||
|         logger.info(color_text('Router shutdown received', Fore.BLACK, Back.RED)) |         logger.info(color_text('Router shutdown received', Fore.BLACK, Back.RED)) | ||||||
| 
 |  | ||||||
|         self.should_exit = True |         self.should_exit = True | ||||||
|  | 
 | ||||||
|  |         # manually track and cancel pending futures b/c `.shutdown(cancel_futures=True)` | ||||||
|  |         # is misleading, and will cause an outer `as_completed` loop to hang | ||||||
|         for future in tqdm( |         for future in tqdm( | ||||||
|             list(self._active_futures), |             list(self._active_futures), | ||||||
|             desc=color_text('Cancelling active futures...', Fore.BLACK, Back.RED), |             desc=color_text( | ||||||
|  |                 f'Cancelling {len(self._active_futures)} pending futures...', | ||||||
|  |                 Fore.BLACK, Back.RED), | ||||||
|             colour='red', |             colour='red', | ||||||
|         ): |         ): | ||||||
|             future.cancel() |             future.cancel() | ||||||
| 
 | 
 | ||||||
|         if self.thread_pool is not None: |         if self._thread_pool_2 is not None: | ||||||
|             self.thread_pool.shutdown(wait=False) |             # cancel pending futures (i.e., those not started) | ||||||
|  |             self.secondary_thread_pool.shutdown(wait=False) | ||||||
| 
 | 
 | ||||||
|     def general_task_done(self, future): |         if self._thread_pool_1 is not None: | ||||||
|         self._active_futures.remove(future) |             # cancel pending futures (i.e., those not started) | ||||||
|         try: |             self.primary_thread_pool.shutdown(wait=False) | ||||||
|             if not future.cancelled(): |  | ||||||
|                 future.result() |  | ||||||
|         except Exception as e: |  | ||||||
|             logger.error(f"Exception occurred in threaded task: '{e}'") |  | ||||||
|             #traceback.print_exc() |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class ChainRouter[E: Event](Router[E]): | class ChainRouter[E: Event](Router[E]): | ||||||
| @ -538,7 +907,11 @@ class ChainRouter[E: Event](Router[E]): | |||||||
|         for endpoint, routelist in router.routemap.items(): |         for endpoint, routelist in router.routemap.items(): | ||||||
|             self.routemap[endpoint].extend(routelist) |             self.routemap[endpoint].extend(routelist) | ||||||
| 
 | 
 | ||||||
|     def matching_routes(self, event: E, event_time=None): |     def matching_routes( | ||||||
|  |         self, | ||||||
|  |         event: E, | ||||||
|  |         event_time=None | ||||||
|  |     ): | ||||||
|         ''' |         ''' | ||||||
|         Colloquial ``callbacks`` now used as a dict of lists of callbacks, indexed by |         Colloquial ``callbacks`` now used as a dict of lists of callbacks, indexed by | ||||||
|         router, and only having keys for routers with non-empty callback lists. |         router, and only having keys for routers with non-empty callback lists. | ||||||
| @ -546,25 +919,51 @@ class ChainRouter[E: Event](Router[E]): | |||||||
|         if event_time is None: |         if event_time is None: | ||||||
|             event_time = time.time()*1000 |             event_time = time.time()*1000 | ||||||
| 
 | 
 | ||||||
|         route_map = {} |         callback_map  = {} | ||||||
|  |         timeout_map   = {} | ||||||
|  |         condition_map = {} | ||||||
|         for router in self.ordered_routers: |         for router in self.ordered_routers: | ||||||
|             router_matches = router.matching_routes(event, event_time) |             matches, timeouts, conditions = router.matching_routes(event, event_time) | ||||||
|             if router_matches: |             if matches: | ||||||
|                 route_map[router] = router_matches |                 callback_map[router]  = matches | ||||||
|  |                 timeout_map[router]   = timeouts | ||||||
|  |                 condition_map[router] = conditions | ||||||
| 
 | 
 | ||||||
|         return route_map |         return callback_map, timeout_map, condition_map | ||||||
| 
 | 
 | ||||||
|     def wait_on_callbacks(self, callbacks, event: E, *args, **kwargs): |     def wait_on_event_callbacks( | ||||||
|  |         self, | ||||||
|  |         event      : E, | ||||||
|  |         callbacks  : dict[Router, list[Callable]], | ||||||
|  |         timeouts   : dict[Router, list[int | float | None]] | None    = None, | ||||||
|  |         conditions : dict[Router, list[FrameDirective | None]] | None = None, | ||||||
|  |     ): #, *args, **kwargs): | ||||||
|         ''' |         ''' | ||||||
|         Note: relies on order of callbacks dict matching that of ``ordered_routers``, which |         Returns a dictionary mapping from  | ||||||
|         should happen in ``matching_routes`` | 
 | ||||||
|  |         Note: relies on order of callback-associated dicts matching that of | ||||||
|  |         ``ordered_routers``, which should happen in ``matching_routes``. | ||||||
|  | 
 | ||||||
|  |         This method blurs the OOP lines a bit, as we're actually passing dicts rather than | ||||||
|  |         the lists expected by the super class. The parent ``submit_event`` is carefully | ||||||
|  |         designed to be indifferent; use caution when making changes. | ||||||
|         ''' |         ''' | ||||||
|         results = {} |         if timeouts is not None: | ||||||
|  |             timeouts = {} | ||||||
|  | 
 | ||||||
|  |         if conditions is not None: | ||||||
|  |             conditions = {} | ||||||
|  | 
 | ||||||
|  |         futures = {} | ||||||
|         for router, callback_list in callbacks.items(): |         for router, callback_list in callbacks.items(): | ||||||
|             router_results = router.submit_event(event, callbacks=callback_list) |             futures[router] = router.submit_event( | ||||||
|             results[router] = router_results |                 event, | ||||||
|  |                 callback_list, | ||||||
|  |                 timeouts=timeouts.get(router), | ||||||
|  |                 conditions=conditions.get(router), | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|         return results |         return futures | ||||||
| 
 | 
 | ||||||
|     def queue_callbacks(self, event_idx, callbacks): |     def queue_callbacks(self, event_idx, callbacks): | ||||||
|         for router, callback_list in callbacks.items(): |         for router, callback_list in callbacks.items(): | ||||||
| @ -601,6 +1000,12 @@ class ChainRouter[E: Event](Router[E]): | |||||||
|             router.extend_listener(listener) |             router.extend_listener(listener) | ||||||
|         return listener |         return listener | ||||||
| 
 | 
 | ||||||
|  |     def shutdown(self): | ||||||
|  |         super().shutdown() | ||||||
|  | 
 | ||||||
|  |         # for router in self.ordered_routers: | ||||||
|  |         #     router.shutdown() | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| # RouterBuilder | # RouterBuilder | ||||||
| def route(router, route_group, **route_kwargs): | def route(router, route_group, **route_kwargs): | ||||||
|  | |||||||
| @ -1,4 +1,5 @@ | |||||||
| import logging | import logging | ||||||
|  | import concurrent  | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from concurrent.futures import as_completed | from concurrent.futures import as_completed | ||||||
| 
 | 
 | ||||||
| @ -10,6 +11,7 @@ from co3.resources import DiskResource | |||||||
| from co3 import Differ, Syncer, Database | from co3 import Differ, Syncer, Database | ||||||
| 
 | 
 | ||||||
| from execlog.event import Event | from execlog.event import Event | ||||||
|  | from execlog.router import CancelledFrameError | ||||||
| from execlog.routers import PathRouter | from execlog.routers import PathRouter | ||||||
| from execlog.util.generic import color_text | from execlog.util.generic import color_text | ||||||
| 
 | 
 | ||||||
| @ -146,20 +148,44 @@ class PathRouterSyncer(Syncer[Path]): | |||||||
| 
 | 
 | ||||||
|         # note: we structure this future waiting like this for the TQDM view |         # note: we structure this future waiting like this for the TQDM view | ||||||
|         results = [] |         results = [] | ||||||
|         for future in tqdm( |         success = 0 | ||||||
|             as_completed(event_futures), |         cancelled = 0 | ||||||
|  |         errored = 0 | ||||||
|  |         submitted = len(event_futures) | ||||||
|  |         progress_bar = tqdm( | ||||||
|             total=len(event_futures), |             total=len(event_futures), | ||||||
|             desc=f'Awaiting chunk futures [submitted {len(event_futures)}]' |             desc=f'Awaiting chunk futures [submitted {len(event_futures)}]' | ||||||
|         ): |         ) | ||||||
|  | 
 | ||||||
|  |         for future in as_completed(event_futures): | ||||||
|             try: |             try: | ||||||
|                 if not future.cancelled(): |                 callback_future_list = future.result() | ||||||
|                     results.append(future.result()) |                 results.append(callback_future_list) | ||||||
|  |                 success += 1 | ||||||
|  |             except concurrent.futures.CancelledError as e: | ||||||
|  |                 cancelled += 1 | ||||||
|  |                 #logger.error(f'Event future cancelled; "{e}"') | ||||||
|  |             except CancelledFrameError as e: | ||||||
|  |                 errored += 1 | ||||||
|  |                 pass | ||||||
|  |                 #logger.error(f'Event frame cancelled; "{e}"') | ||||||
|             except Exception as e: |             except Exception as e: | ||||||
|                 logger.warning(f"Sync job failed with exception {e}") |                 errored += 1 | ||||||
|  |                 logger.warning(f'Sync job failed with unknown exception "{e}"') | ||||||
|  | 
 | ||||||
|  |             suc_txt = color_text(f'{success}',   Fore.GREEN) | ||||||
|  |             can_txt = color_text(f'{cancelled}', Fore.YELLOW) | ||||||
|  |             err_txt = color_text(f'{errored}',   Fore.RED) | ||||||
|  |             tot_txt = color_text(f'{success+cancelled+errored}', Style.BRIGHT) | ||||||
|  |             progress_bar.set_description( | ||||||
|  |                 f'Awaiting chunk futures [{tot_txt} / {submitted} | {suc_txt} {can_txt} {err_txt}]' | ||||||
|  |             ) | ||||||
|  |             progress_bar.update(n=1) | ||||||
|  | 
 | ||||||
|  |         progress_bar.close() | ||||||
| 
 | 
 | ||||||
|         return results |         return results | ||||||
| 
 | 
 | ||||||
|     def shutdown(self): |     def shutdown(self): | ||||||
|         super().shutdown() |         super().shutdown() | ||||||
|         self.router.shutdown() |         self.router.shutdown() | ||||||
| 
 |  | ||||||
|  | |||||||
| @ -8,6 +8,13 @@ from colorama import Fore, Back, Style | |||||||
| def color_text(text, *colorama_args): | def color_text(text, *colorama_args): | ||||||
|     return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}" |     return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}" | ||||||
| 
 | 
 | ||||||
|  | def get_func_name(func): | ||||||
|  |     func_name = str(func) | ||||||
|  |     if hasattr(func, '__name__'): | ||||||
|  |         func_name = func.__name__ | ||||||
|  | 
 | ||||||
|  |     return func_name | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class ColorFormatter(logging.Formatter): | class ColorFormatter(logging.Formatter): | ||||||
|     _format = '%(levelname)-8s :: %(name)s %(message)s' |     _format = '%(levelname)-8s :: %(name)s %(message)s' | ||||||
|  | |||||||
| @ -1,6 +1,7 @@ | |||||||
| import logging | import logging | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from functools import partial | from functools import partial | ||||||
|  | from concurrent.futures import wait | ||||||
| 
 | 
 | ||||||
| from execlog import util | from execlog import util | ||||||
| from execlog import ChainRouter, Event | from execlog import ChainRouter, Event | ||||||
| @ -41,7 +42,7 @@ def test_single_router_submission(): | |||||||
|         Event(endpoint='endpoint_proxy', name='file3'), |         Event(endpoint='endpoint_proxy', name='file3'), | ||||||
|     ] |     ] | ||||||
|     futures = router2.submit(events) |     futures = router2.submit(events) | ||||||
|     router2.wait_on_futures(futures) |     wait(futures) | ||||||
|      |      | ||||||
|     assert True |     assert True | ||||||
| 
 | 
 | ||||||
| @ -52,7 +53,7 @@ def test_chain_router_submission(): | |||||||
|         Event(endpoint='endpoint_proxy', name='file3'), |         Event(endpoint='endpoint_proxy', name='file3'), | ||||||
|     ] |     ] | ||||||
|     futures = chain_router.submit(events) |     futures = chain_router.submit(events) | ||||||
|     chain_router.wait_on_futures(futures) |     wait(futures) | ||||||
| 
 | 
 | ||||||
|     assert True |     assert True | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -94,7 +94,8 @@ def test_server_with_listeners(): | |||||||
|     server.shutdown() |     server.shutdown() | ||||||
|     thread.join() |     thread.join() | ||||||
| 
 | 
 | ||||||
|     # finally check the router event logs |     # finally check the router event logs: holds tier-I futures, which hold lists of | ||||||
|     assert router1.event_log[0][1] == ['router1 job success'] |     # tier-II futures | ||||||
|     assert router2.event_log[0][1] == ['router2 job success'] |     assert [r.result() for r in router1.event_log[0][1].result()] == ['router1 job success'] | ||||||
|  |     assert [r.result() for r in router2.event_log[0][1].result()] == ['router2 job success'] | ||||||
| 
 | 
 | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user