implement PathRouterSyncer for file-based diff management, make RouterBuilder type ChainRouter

This commit is contained in:
Sam G. 2024-05-11 03:12:41 -07:00
parent 099ebad566
commit 16e2e94e2a
6 changed files with 199 additions and 25 deletions

View File

@ -1,5 +1,6 @@
from execlog import util
from execlog import routers
from execlog import syncers
from execlog import listeners
from execlog.server import Server

View File

@ -159,9 +159,13 @@ class PathListener(Listener[FileEvent]):
logger.info(f'> Listening on path {path} for flags {iflags.from_mask(flags)}')
for (callback, pattern, debounce, delay, *_) in self.router.routemap[path]:
callback_name = str(callback)
if hasattr(callback, '__name__'):
callback_name = callback.__name__
logger.info(
color_text(
f'| > {pattern} -> {callback.__name__} (debounce {debounce}ms, delay {delay}ms)',
f'| > {pattern} -> {callback_name} (debounce {debounce}ms, delay {delay}ms)',
Style.DIM,
)
)
@ -315,19 +319,17 @@ class PathListener(Listener[FileEvent]):
If ``handle_events`` is called externally, note that this loop will block in the
calling thread until the jobs have been submitted. It will *not* block until
jobs have completed, however, as a list of futures is returned. The calling
Watcher instance may have already been started, in which case ``run()`` will
Listener instance may have already been started, in which case ``run()`` will
already be executing in a separate thread. Calling this method externally will
not interfere with this loop insofar as it adds jobs to the same thread pool.
Because this method only submits jobs associated with the provided ``events``,
the calling thread can await the returned list of futures and be confident
that top-level callbacks associated with these file events have completed. Do
note that, if the Watcher has already been started, any propagating file
note that, if the Listener has already been started, any propagating file
events will be picked up and possibly processed simultaneously (although their
associated callbacks will have nothing to do with the returned list of futures).
'''
from execlog.router import Event
for event in events:
# hard coded ignores
if util.path.glob_match(event.name, util.path.IGNORE_PATTERNS): continue
@ -359,7 +361,7 @@ class PathListener(Listener[FileEvent]):
logger.debug(f'Watcher fired for [{relpath}]: {mask_flags}')
route_event = Event(endpoint=str(path), name=str(relpath), action=mask_flags)
route_event = FileEvent(endpoint=str(path), name=str(relpath), action=mask_flags)
self.router.submit(route_event)
# handle renamed directories; old dir was being watched if these flags

View File

@ -96,7 +96,7 @@ class Router[E: Event]:
self.loop = loop
self.workers = workers
self.routemap : dict[str, list[tuple]] = defaultdict(list)
self.routemap : dict[str, list[tuple]] = defaultdict(list)
self.post_callbacks = []
# track running jobs by event
@ -592,7 +592,7 @@ class RouteRegistryMeta(type):
return super().__new__(cls, name, bases, attrs)
class RouterBuilder(metaclass=RouteRegistryMeta):
class RouterBuilder(ChainRouter, metaclass=RouteRegistryMeta):
'''
Builds a (Chain)Router using attached methods and passed options.
@ -656,9 +656,11 @@ class RouterBuilder(metaclass=RouteRegistryMeta):
register_map: dict[str, tuple[Router, dict[str, tuple[tuple[str, str], dict[str, Any]]]]],
):
self.register_map = register_map
routers = []
# register
for router_name, (router, router_options) in self.register_map.items():
routers.append(router)
for route_group, method_arg_list in self.route_registry[router_name].items():
# get post-callbacks for reserved key "post"
# assumed no kwargs for passthrough
@ -686,6 +688,10 @@ class RouterBuilder(metaclass=RouteRegistryMeta):
}
)
def get_router(self, router_key_list: list[str]):
return ChainRouter([self.register_map[k][0] for k in router_key_list])
super().__init__(routers)
# -- disabling for now to inherit from ChainRouter directly. Require the order to
# -- simply be specified by the order of the router keys in the register_map
# def get_router(self, router_key_list: list[str]):
# return ChainRouter([self.register_map[k][0] for k in router_key_list])

View File

@ -46,24 +46,40 @@ class PathRouter(Router[FileEvent]):
def filter(self, event, glob, **listen_kwargs) -> bool:
'''
Note:
If ``handle_events`` is called externally, note that this loop will block in the
calling thread until the jobs have been submitted. It will _not_ block until
jobs have completed, however, as a list of futures is returned. The calling
Watcher instance may have already been started, in which case ``run()`` will
already be executing in a separate thread. Calling this method externally will
not interfere with this loop insofar as it adds jobs to the same thread pool.
Filter path events based on the provided glob pattern and listen arguments.
Because this method only submits jobs associated with the provided ``events``,
the calling thread can await the returned list of futures and be confident
that top-level callbacks associated with these file events have completed. Do
note that, if the Watcher has already been started, any propagating file
events will be picked up and possibly process simultaneously (although their
associated callbacks will have nothing to do with the return list of futures).
This method is needed due to the lack of granularity when you have separate router
callbacks that listen to the same directory (or overlap on some nested directory
therein) with *different listen flags*. The overlapping path in question will only
ever be assigned a single watch descriptor by iNotify, but will (or at least appears
to) add (via bitwise OR) new flags if the same path is registered. Thus, an event
fired by iNotify cannot be automatically propagated to the registered callbacks,
as the file event "action" may apply only to a subset of those functions. This is
the place for that final delineation, ensuring the exact action is matched before
callback execution. This has the benefit of being a suitable proxy for the actual
iNotify filtering that takes place when submitting synthetic events to the router
by hand.
**Bigger picture, and why we have to reproduce the work already done by an
event-based mechanism like iNotify**: Realistically, such a method is needed
regardless if we hope to connect to the threaded router model as we do not
canonically store callback associations at the listener level. If our responses
could be tied one-to-one to the sensitivities of iNotify events, then they could
be called directly in response to them. But they are not: we want to support
glob-based filtering, need to delineate by flags as explained above, and can have
separate endpoints for the same path. These are conditions *not* collapsed at the
iNotify level, and thus need to be fully re-implemented for callback matching.
(For example, imagine we had callback uniqueness on just endpoint and glob, i.e.,
no sensitivity to flags, then the flag-matching conditions implemented here would
not be needed to rightly pass iNotify events to their callbacks. In such a case,
we could rely fully on iNotify's flag response model to implicitly handle this
aspect of the filtering process. If the same could be said the remaining
constraints, then as mentioned, we could simply associate callbacks one-to-one and
avoid the auxiliary filtering altogether.)
Parameters:
event : Event instance
glob : Single string or tuple of glob patterns to check against event endpoint
event: Event instance
glob: Single string or tuple of glob patterns to check against event endpoint
'''
not_tmp_glob = '**/!(.*|*.tmp|*~)'
if not glob_match(Path(event.name), not_tmp_glob):

View File

@ -0,0 +1 @@
from execlog.syncers.router import PathDiffer, PathRouterSyncer

148
execlog/syncers/router.py Normal file
View File

@ -0,0 +1,148 @@
from pathlib import Path
from co3.resources import DiskResource
from co3 import Differ, Syncer, Database
from execlog.event import Event
from execlog.routers import PathRouter
class PathDiffer(Differ[Path]):
def __init__(
self,
database: Database,
):
super().__init__(DiskResource(), database)
def l_transform(self, item):
'''
Transform ``(path, head)`` tuple from ``DiskResource``.
'''
return Path(*item)
class PathRouterSyncer(Syncer[Path]):
def __init__(
self,
differ: PathDiffer,
router: PathRouter,
):
super().__init__(differ)
self.router = router
def _construct_event(
self,
fpath: str | Path,
endpoint: str | Path,
action: bytes
):
return Event(
endpoint=str(endpoint),
name=str(Path(fpath).relative_to(endpoint)),
action=[action], # synthetic action to match any flag filters
)
def handle_l_excl(self, path: Path, disk_pairs: list):
'''
Handle disk exclusive paths (i.e., those added to disk since last sync).
'''
return [
self._construct_event(str(path), endpoint, iflags.CREATE)
for endpoint, _ in disk_pairs
]
def handle_r_excl(self, path: Path, db_vals: list):
'''
Handle database exclusive paths (i.e., those deleted from disk since last sync).
Searches for matching endpoints under the attached router and creates
corresponding events.
.. admonition:: On lack of endpoints
This method handles database exclusive items, i.e., paths no longer on disk
but still in the database. For typical Router designs, it is not important to
preserve possible endpoints of origin for this kind of event; what matters is
the absolute path of the file to be removed. In general, file events are
associated solely with a path, but we in some cases may be sensitive to the
base path seen to be "triggering" that file event, as router methods can hook
in to specific endpoints. This has somewhat dubious effects, as multiple
events (with the same action) are dispatched for the same file, purely to
observe the Router convention of endpoints and allowing independent
trajectories through the execution sequence.
One concern here is that you might, in theory, want to respond to the same
file deletion event in different ways under different endpoints. This will be
accessible when picking up such an event live, as endpoints are grouped by
watch descriptor and can be all be triggered from the single file event. This
is the one case where we can't *really* simulate the event taking place with
the available data, and instead have to peer into the router to see what root
paths the file could theoretically trigger. Most of the time, this won't be
too problematic, since we'll be watching the same paths and can tell where a
deleted file would've been. But there are cases where a watch path endpoint
may be abandoned, and thus no callback will be there to receive the DELETE
event. *Routers should heavily consider implementing a global DELETE handler
to prevent these cases if it's critical to respond to deletions.* Otherwise,
we still make an attempt to propagate under appropriate endpoints, allowing
for possible "deconstructor-like" behavior of specific filetypes (e.g.,
cleaning up auxiliary elements, writing to a log, creating a backup, etc).
'''
return [
self._construct_event(str(path), str(endpoint), iflags.DELETE)
for endpoint in self.router.routemap
if Path(path).is_relative_to(Path(endpoint))
]
def handle_lr_int(self, path: Path, path_tuples: tuple[list, list]):
'''
Handle paths reflected both in the database and on disk.
Paths only reach this method if still present after being passed through
``filter_diff_sets``, which will filter out those files that are up-to-date in the
database.
'''
return [
self._construct_event(str(path), endpoint, iflags.MODIFY)
for endpoint, _ in path_tuples[1]
]
def filter_diff_sets(self, l_excl, r_excl, lr_int):
total_disk_files = len(l_excl) + len(lr_int)
def file_out_of_sync(p):
db_el, disk_el = lr_int[p]
db_mtime = float(db_el[0].get('mtime','0'))
disk_mtime = File(p, disk_el[0]).mtime
return disk_mtime > db_mtime
lr_int = {p:v for p,v in lr_int.items() if file_out_of_sync(p)}
# compute out-of-sync details
oos_count = len(l_excl) + len(lr_int)
oos_prcnt = oos_count / max(total_disk_files, 1) * 100
logger.info(color_text(Fore.GREEN, f'{len(l_excl)} new files to add'))
logger.info(color_text(Fore.YELLOW, f'{len(lr_int)} modified files'))
logger.info(color_text(Fore.RED, f'{len(r_excl)} files to remove'))
logger.info(color_text(Style.DIM, f'({oos_prcnt:.2f}%) of disk files out-of-sync'))
return l_excl, r_excl, lr_int
def process_chunk(self, event_sets):
chunk_events = [e for event_set in event_sets for e in event_set]
# 1) flush synthetic events for the batch through the chained router
# 2) block until completed and sweep up the collected inserts
event_futures = self.router.submit(chunk_events)
# note: we structure this future waiting like this for the TQDM view
results = []
for future in tqdm(
as_completed(event_futures),
total=chunk_size,
desc=f'Awaiting chunk futures [submitted {len(event_futures)}/{chunk_size}]'
):
try:
results.append(future.result())
except Exception as e:
logger.warning(f"Sync job failed with exception {e}")
return results