From 16e2e94e2a0a1983518a09922633b7a3a75b43df Mon Sep 17 00:00:00 2001 From: "Sam G." Date: Sat, 11 May 2024 03:12:41 -0700 Subject: [PATCH] implement PathRouterSyncer for file-based diff management, make RouterBuilder type ChainRouter --- execlog/__init__.py | 1 + execlog/listeners/path.py | 14 ++-- execlog/router.py | 14 +++- execlog/routers/path.py | 46 +++++++---- execlog/syncers/__init__.py | 1 + execlog/syncers/router.py | 148 ++++++++++++++++++++++++++++++++++++ 6 files changed, 199 insertions(+), 25 deletions(-) create mode 100644 execlog/syncers/__init__.py create mode 100644 execlog/syncers/router.py diff --git a/execlog/__init__.py b/execlog/__init__.py index 44fe1a4..f3fde07 100644 --- a/execlog/__init__.py +++ b/execlog/__init__.py @@ -1,5 +1,6 @@ from execlog import util from execlog import routers +from execlog import syncers from execlog import listeners from execlog.server import Server diff --git a/execlog/listeners/path.py b/execlog/listeners/path.py index d098ba7..ced60ea 100644 --- a/execlog/listeners/path.py +++ b/execlog/listeners/path.py @@ -159,9 +159,13 @@ class PathListener(Listener[FileEvent]): logger.info(f'> Listening on path {path} for flags {iflags.from_mask(flags)}') for (callback, pattern, debounce, delay, *_) in self.router.routemap[path]: + callback_name = str(callback) + if hasattr(callback, '__name__'): + callback_name = callback.__name__ + logger.info( color_text( - f'| > {pattern} -> {callback.__name__} (debounce {debounce}ms, delay {delay}ms)', + f'| > {pattern} -> {callback_name} (debounce {debounce}ms, delay {delay}ms)', Style.DIM, ) ) @@ -315,19 +319,17 @@ class PathListener(Listener[FileEvent]): If ``handle_events`` is called externally, note that this loop will block in the calling thread until the jobs have been submitted. It will *not* block until jobs have completed, however, as a list of futures is returned. The calling - Watcher instance may have already been started, in which case ``run()`` will + Listener instance may have already been started, in which case ``run()`` will already be executing in a separate thread. Calling this method externally will not interfere with this loop insofar as it adds jobs to the same thread pool. Because this method only submits jobs associated with the provided ``events``, the calling thread can await the returned list of futures and be confident that top-level callbacks associated with these file events have completed. Do - note that, if the Watcher has already been started, any propagating file + note that, if the Listener has already been started, any propagating file events will be picked up and possibly processed simultaneously (although their associated callbacks will have nothing to do with the returned list of futures). ''' - from execlog.router import Event - for event in events: # hard coded ignores if util.path.glob_match(event.name, util.path.IGNORE_PATTERNS): continue @@ -359,7 +361,7 @@ class PathListener(Listener[FileEvent]): logger.debug(f'Watcher fired for [{relpath}]: {mask_flags}') - route_event = Event(endpoint=str(path), name=str(relpath), action=mask_flags) + route_event = FileEvent(endpoint=str(path), name=str(relpath), action=mask_flags) self.router.submit(route_event) # handle renamed directories; old dir was being watched if these flags diff --git a/execlog/router.py b/execlog/router.py index 5b7e9da..ce3c64e 100644 --- a/execlog/router.py +++ b/execlog/router.py @@ -96,7 +96,7 @@ class Router[E: Event]: self.loop = loop self.workers = workers - self.routemap : dict[str, list[tuple]] = defaultdict(list) + self.routemap : dict[str, list[tuple]] = defaultdict(list) self.post_callbacks = [] # track running jobs by event @@ -592,7 +592,7 @@ class RouteRegistryMeta(type): return super().__new__(cls, name, bases, attrs) -class RouterBuilder(metaclass=RouteRegistryMeta): +class RouterBuilder(ChainRouter, metaclass=RouteRegistryMeta): ''' Builds a (Chain)Router using attached methods and passed options. @@ -656,9 +656,11 @@ class RouterBuilder(metaclass=RouteRegistryMeta): register_map: dict[str, tuple[Router, dict[str, tuple[tuple[str, str], dict[str, Any]]]]], ): self.register_map = register_map + routers = [] # register for router_name, (router, router_options) in self.register_map.items(): + routers.append(router) for route_group, method_arg_list in self.route_registry[router_name].items(): # get post-callbacks for reserved key "post" # assumed no kwargs for passthrough @@ -686,6 +688,10 @@ class RouterBuilder(metaclass=RouteRegistryMeta): } ) - def get_router(self, router_key_list: list[str]): - return ChainRouter([self.register_map[k][0] for k in router_key_list]) + super().__init__(routers) + + # -- disabling for now to inherit from ChainRouter directly. Require the order to + # -- simply be specified by the order of the router keys in the register_map + # def get_router(self, router_key_list: list[str]): + # return ChainRouter([self.register_map[k][0] for k in router_key_list]) diff --git a/execlog/routers/path.py b/execlog/routers/path.py index e2bcc76..5d9a1f6 100644 --- a/execlog/routers/path.py +++ b/execlog/routers/path.py @@ -46,24 +46,40 @@ class PathRouter(Router[FileEvent]): def filter(self, event, glob, **listen_kwargs) -> bool: ''' - Note: - If ``handle_events`` is called externally, note that this loop will block in the - calling thread until the jobs have been submitted. It will _not_ block until - jobs have completed, however, as a list of futures is returned. The calling - Watcher instance may have already been started, in which case ``run()`` will - already be executing in a separate thread. Calling this method externally will - not interfere with this loop insofar as it adds jobs to the same thread pool. + Filter path events based on the provided glob pattern and listen arguments. - Because this method only submits jobs associated with the provided ``events``, - the calling thread can await the returned list of futures and be confident - that top-level callbacks associated with these file events have completed. Do - note that, if the Watcher has already been started, any propagating file - events will be picked up and possibly process simultaneously (although their - associated callbacks will have nothing to do with the return list of futures). + This method is needed due to the lack of granularity when you have separate router + callbacks that listen to the same directory (or overlap on some nested directory + therein) with *different listen flags*. The overlapping path in question will only + ever be assigned a single watch descriptor by iNotify, but will (or at least appears + to) add (via bitwise OR) new flags if the same path is registered. Thus, an event + fired by iNotify cannot be automatically propagated to the registered callbacks, + as the file event "action" may apply only to a subset of those functions. This is + the place for that final delineation, ensuring the exact action is matched before + callback execution. This has the benefit of being a suitable proxy for the actual + iNotify filtering that takes place when submitting synthetic events to the router + by hand. + + **Bigger picture, and why we have to reproduce the work already done by an + event-based mechanism like iNotify**: Realistically, such a method is needed + regardless if we hope to connect to the threaded router model as we do not + canonically store callback associations at the listener level. If our responses + could be tied one-to-one to the sensitivities of iNotify events, then they could + be called directly in response to them. But they are not: we want to support + glob-based filtering, need to delineate by flags as explained above, and can have + separate endpoints for the same path. These are conditions *not* collapsed at the + iNotify level, and thus need to be fully re-implemented for callback matching. + (For example, imagine we had callback uniqueness on just endpoint and glob, i.e., + no sensitivity to flags, then the flag-matching conditions implemented here would + not be needed to rightly pass iNotify events to their callbacks. In such a case, + we could rely fully on iNotify's flag response model to implicitly handle this + aspect of the filtering process. If the same could be said the remaining + constraints, then as mentioned, we could simply associate callbacks one-to-one and + avoid the auxiliary filtering altogether.) Parameters: - event : Event instance - glob : Single string or tuple of glob patterns to check against event endpoint + event: Event instance + glob: Single string or tuple of glob patterns to check against event endpoint ''' not_tmp_glob = '**/!(.*|*.tmp|*~)' if not glob_match(Path(event.name), not_tmp_glob): diff --git a/execlog/syncers/__init__.py b/execlog/syncers/__init__.py new file mode 100644 index 0000000..104dcbb --- /dev/null +++ b/execlog/syncers/__init__.py @@ -0,0 +1 @@ +from execlog.syncers.router import PathDiffer, PathRouterSyncer diff --git a/execlog/syncers/router.py b/execlog/syncers/router.py new file mode 100644 index 0000000..5274548 --- /dev/null +++ b/execlog/syncers/router.py @@ -0,0 +1,148 @@ +from pathlib import Path + +from co3.resources import DiskResource +from co3 import Differ, Syncer, Database + +from execlog.event import Event +from execlog.routers import PathRouter + + +class PathDiffer(Differ[Path]): + def __init__( + self, + database: Database, + ): + super().__init__(DiskResource(), database) + + def l_transform(self, item): + ''' + Transform ``(path, head)`` tuple from ``DiskResource``. + ''' + return Path(*item) + +class PathRouterSyncer(Syncer[Path]): + def __init__( + self, + differ: PathDiffer, + router: PathRouter, + ): + super().__init__(differ) + self.router = router + + def _construct_event( + self, + fpath: str | Path, + endpoint: str | Path, + action: bytes + ): + return Event( + endpoint=str(endpoint), + name=str(Path(fpath).relative_to(endpoint)), + action=[action], # synthetic action to match any flag filters + ) + + def handle_l_excl(self, path: Path, disk_pairs: list): + ''' + Handle disk exclusive paths (i.e., those added to disk since last sync). + ''' + return [ + self._construct_event(str(path), endpoint, iflags.CREATE) + for endpoint, _ in disk_pairs + ] + + def handle_r_excl(self, path: Path, db_vals: list): + ''' + Handle database exclusive paths (i.e., those deleted from disk since last sync). + Searches for matching endpoints under the attached router and creates + corresponding events. + + .. admonition:: On lack of endpoints + + This method handles database exclusive items, i.e., paths no longer on disk + but still in the database. For typical Router designs, it is not important to + preserve possible endpoints of origin for this kind of event; what matters is + the absolute path of the file to be removed. In general, file events are + associated solely with a path, but we in some cases may be sensitive to the + base path seen to be "triggering" that file event, as router methods can hook + in to specific endpoints. This has somewhat dubious effects, as multiple + events (with the same action) are dispatched for the same file, purely to + observe the Router convention of endpoints and allowing independent + trajectories through the execution sequence. + + One concern here is that you might, in theory, want to respond to the same + file deletion event in different ways under different endpoints. This will be + accessible when picking up such an event live, as endpoints are grouped by + watch descriptor and can be all be triggered from the single file event. This + is the one case where we can't *really* simulate the event taking place with + the available data, and instead have to peer into the router to see what root + paths the file could theoretically trigger. Most of the time, this won't be + too problematic, since we'll be watching the same paths and can tell where a + deleted file would've been. But there are cases where a watch path endpoint + may be abandoned, and thus no callback will be there to receive the DELETE + event. *Routers should heavily consider implementing a global DELETE handler + to prevent these cases if it's critical to respond to deletions.* Otherwise, + we still make an attempt to propagate under appropriate endpoints, allowing + for possible "deconstructor-like" behavior of specific filetypes (e.g., + cleaning up auxiliary elements, writing to a log, creating a backup, etc). + ''' + return [ + self._construct_event(str(path), str(endpoint), iflags.DELETE) + for endpoint in self.router.routemap + if Path(path).is_relative_to(Path(endpoint)) + ] + + def handle_lr_int(self, path: Path, path_tuples: tuple[list, list]): + ''' + Handle paths reflected both in the database and on disk. + + Paths only reach this method if still present after being passed through + ``filter_diff_sets``, which will filter out those files that are up-to-date in the + database. + ''' + return [ + self._construct_event(str(path), endpoint, iflags.MODIFY) + for endpoint, _ in path_tuples[1] + ] + + def filter_diff_sets(self, l_excl, r_excl, lr_int): + total_disk_files = len(l_excl) + len(lr_int) + + def file_out_of_sync(p): + db_el, disk_el = lr_int[p] + db_mtime = float(db_el[0].get('mtime','0')) + disk_mtime = File(p, disk_el[0]).mtime + return disk_mtime > db_mtime + + lr_int = {p:v for p,v in lr_int.items() if file_out_of_sync(p)} + + # compute out-of-sync details + oos_count = len(l_excl) + len(lr_int) + oos_prcnt = oos_count / max(total_disk_files, 1) * 100 + + logger.info(color_text(Fore.GREEN, f'{len(l_excl)} new files to add')) + logger.info(color_text(Fore.YELLOW, f'{len(lr_int)} modified files')) + logger.info(color_text(Fore.RED, f'{len(r_excl)} files to remove')) + logger.info(color_text(Style.DIM, f'({oos_prcnt:.2f}%) of disk files out-of-sync')) + + return l_excl, r_excl, lr_int + + def process_chunk(self, event_sets): + chunk_events = [e for event_set in event_sets for e in event_set] + + # 1) flush synthetic events for the batch through the chained router + # 2) block until completed and sweep up the collected inserts + event_futures = self.router.submit(chunk_events) + + # note: we structure this future waiting like this for the TQDM view + results = [] + for future in tqdm( + as_completed(event_futures), + total=chunk_size, + desc=f'Awaiting chunk futures [submitted {len(event_futures)}/{chunk_size}]' + ): + try: + results.append(future.result()) + except Exception as e: + logger.warning(f"Sync job failed with exception {e}") + + return results