execlib/execlog/syncers/router.py

192 lines
7.6 KiB
Python
Raw Permalink Normal View History

import logging
import concurrent
from pathlib import Path
from concurrent.futures import as_completed
from tqdm import tqdm
from colorama import Fore, Back, Style
from inotify_simple import flags as iflags
from co3.resources import DiskResource
from co3 import Differ, Syncer, Database
from execlog.event import Event
from execlog.router import CancelledFrameError
from execlog.routers import PathRouter
from execlog.util.generic import color_text
logger = logging.getLogger(__name__)
class PathDiffer(Differ[Path]):
def __init__(
self,
database: Database,
):
super().__init__(DiskResource(), database)
def l_transform(self, item):
'''
Transform ``(path, head)`` tuple from ``DiskResource``.
'''
return Path(*item)
class PathRouterSyncer(Syncer[Path]):
def __init__(
self,
differ: PathDiffer,
router: PathRouter,
):
super().__init__(differ)
self.router = router
def _construct_event(
self,
fpath: str | Path,
endpoint: str | Path,
action: bytes
):
return Event(
endpoint=str(endpoint),
name=str(Path(fpath).relative_to(endpoint)),
action=[action], # synthetic action to match any flag filters
)
def handle_l_excl(self, path: Path, disk_pairs: list):
'''
Handle disk exclusive paths (i.e., those added to disk since last sync).
'''
return [
self._construct_event(str(path), endpoint, iflags.CREATE)
for endpoint, _ in disk_pairs
]
def handle_r_excl(self, path: Path, db_vals: list):
'''
Handle database exclusive paths (i.e., those deleted from disk since last sync).
Searches for matching endpoints under the attached router and creates
corresponding events.
.. admonition:: On lack of endpoints
This method handles database exclusive items, i.e., paths no longer on disk
but still in the database. For typical Router designs, it is not important to
preserve possible endpoints of origin for this kind of event; what matters is
the absolute path of the file to be removed. In general, file events are
associated solely with a path, but we in some cases may be sensitive to the
base path seen to be "triggering" that file event, as router methods can hook
in to specific endpoints. This has somewhat dubious effects, as multiple
events (with the same action) are dispatched for the same file, purely to
observe the Router convention of endpoints and allowing independent
trajectories through the execution sequence.
One concern here is that you might, in theory, want to respond to the same
file deletion event in different ways under different endpoints. This will be
accessible when picking up such an event live, as endpoints are grouped by
watch descriptor and can be all be triggered from the single file event. This
is the one case where we can't *really* simulate the event taking place with
the available data, and instead have to peer into the router to see what root
paths the file could theoretically trigger. Most of the time, this won't be
too problematic, since we'll be watching the same paths and can tell where a
deleted file would've been. But there are cases where a watch path endpoint
may be abandoned, and thus no callback will be there to receive the DELETE
event. *Routers should heavily consider implementing a global DELETE handler
to prevent these cases if it's critical to respond to deletions.* Otherwise,
we still make an attempt to propagate under appropriate endpoints, allowing
for possible "deconstructor-like" behavior of specific filetypes (e.g.,
cleaning up auxiliary elements, writing to a log, creating a backup, etc).
'''
return [
self._construct_event(str(path), str(endpoint), iflags.DELETE)
for endpoint in self.router.routemap
if Path(path).is_relative_to(Path(endpoint))
]
def handle_lr_int(self, path: Path, path_tuples: tuple[list, list]):
'''
Handle paths reflected both in the database and on disk.
Paths only reach this method if still present after being passed through
``filter_diff_sets``, which will filter out those files that are up-to-date in the
database.
'''
return [
self._construct_event(str(path), endpoint, iflags.MODIFY)
for endpoint, _ in path_tuples[0]
]
def filter_diff_sets(self, l_excl, r_excl, lr_int):
total_disk_files = len(l_excl) + len(lr_int)
total_joint_files = len(lr_int)
def file_out_of_sync(p):
_, db_el = lr_int[p]
db_mtime = float(db_el[0].get('mtime','0'))
disk_mtime = Path(p).stat().st_mtime
return disk_mtime > db_mtime
lr_int = {p:v for p,v in lr_int.items() if file_out_of_sync(p)}
# compute out-of-sync details
oos_count = len(l_excl) + len(lr_int)
oos_prcnt = oos_count / max(total_disk_files, 1) * 100
logger.info(color_text(f'{len(l_excl)} new files to add', Fore.GREEN)),
logger.info(color_text(f'{len(lr_int)} modified files [{total_joint_files} up-to-date]', Fore.YELLOW)),
logger.info(color_text(f'{len(r_excl)} files to remove', Fore.RED)),
logger.info(color_text(f'({oos_prcnt:.2f}%) of disk files out-of-sync', Style.DIM)),
return l_excl, r_excl, lr_int
def process_chunk(self, event_sets):
chunk_events = [e for event_set in event_sets for e in event_set]
# 1) flush synthetic events for the batch through the chained router
# 2) block until completed and sweep up the collected inserts
event_futures = self.router.submit(chunk_events)
# note: we structure this future waiting like this for the TQDM view
results = []
success = 0
cancelled = 0
errored = 0
submitted = len(event_futures)
progress_bar = tqdm(
total=len(event_futures),
desc=f'Awaiting chunk futures [submitted {len(event_futures)}]'
)
for future in as_completed(event_futures):
try:
callback_future_list = future.result()
results.append(callback_future_list)
success += 1
except concurrent.futures.CancelledError as e:
cancelled += 1
#logger.error(f'Event future cancelled; "{e}"')
except CancelledFrameError as e:
errored += 1
pass
#logger.error(f'Event frame cancelled; "{e}"')
except Exception as e:
errored += 1
logger.warning(f'Sync job failed with unknown exception "{e}"')
suc_txt = color_text(f'{success}', Fore.GREEN)
can_txt = color_text(f'{cancelled}', Fore.YELLOW)
err_txt = color_text(f'{errored}', Fore.RED)
tot_txt = color_text(f'{success+cancelled+errored}', Style.BRIGHT)
progress_bar.set_description(
f'Awaiting chunk futures [{tot_txt} / {submitted} | {suc_txt} {can_txt} {err_txt}]'
)
progress_bar.update(n=1)
progress_bar.close()
return results
def shutdown(self):
super().shutdown()
self.router.shutdown()