405 lines
19 KiB
Python
405 lines
19 KiB
Python
import os
|
|
import time
|
|
import select
|
|
import logging
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
from colorama import Fore, Back, Style
|
|
from inotify_simple import INotify, Event as iEvent, flags as iflags, masks as imasks
|
|
|
|
from execlog import util
|
|
from execlog.util.generic import color_text
|
|
from execlog.event import FileEvent
|
|
from execlog.listener import Listener
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class PathListener(Listener[FileEvent]):
|
|
def __init__(self, router):
|
|
'''
|
|
Parameters:
|
|
router: associated Router instance that events should be passed to
|
|
|
|
Note:
|
|
Due to the nature of INotify, you cannot watch the same path with two
|
|
separate flag settings (without creating a new INotify instance). Under the
|
|
same instance, calling ``add_watch`` for an already watched path location will
|
|
simply return the watch descriptor already associated with that location (and
|
|
may update the flags to whatever is passed). However, this location will only
|
|
ever be "watched once" by a given INotify instance, so keep this in mind if
|
|
multiple downstream callbacks (packaged up by the Router) want to react to the
|
|
same path but with different flags (i.e., it won't work as expected).
|
|
'''
|
|
super().__init__(router)
|
|
|
|
self.started = False
|
|
|
|
self.pathmap : dict[str, int] = {}
|
|
self.canonmap : dict[int, tuple] = {}
|
|
self.watchmap : dict[int, dict[tuple, int]] = defaultdict(lambda: defaultdict(int))
|
|
|
|
self.unmatched_move_froms : dict[int, dict[str, str]] = defaultdict(dict)
|
|
|
|
self.inotify = INotify()
|
|
|
|
self.read_fd, write_fd = os.pipe()
|
|
self.write = os.fdopen(write_fd, 'wb')
|
|
|
|
def _add_watch(
|
|
self,
|
|
path,
|
|
flags,
|
|
lead=None,
|
|
remove=False,
|
|
):
|
|
if lead is None: lead = ''
|
|
path = Path(path)
|
|
lead = Path(lead)
|
|
|
|
wd = None
|
|
fullpath = Path(path, lead)
|
|
try:
|
|
wd = self.inotify.add_watch(str(fullpath), flags)
|
|
self.watchmap[wd][(path, lead)] |= flags
|
|
self.pathmap[fullpath] = wd
|
|
except FileNotFoundError:
|
|
logger.error(f'Directory not found for [{fullpath}] when attempting to watch')
|
|
|
|
return wd
|
|
|
|
def _add_watch_recursive(
|
|
self,
|
|
path,
|
|
flags,
|
|
lead=None,
|
|
remove=False,
|
|
):
|
|
'''
|
|
Recursively watch directories under path/lead, using ``path`` as the registered
|
|
base. Specifying ``lead`` gives one control over the subdirectory on which to
|
|
recurse; the "origin" the recursion base point.
|
|
|
|
Note: on renamed/moved directories
|
|
This method is used to reset the mapped path/lead tuples pointed to by certain
|
|
watch descriptors when a directory is renamed. iNotify will fire a MOVED event
|
|
for the directory (MOVED_FROM/MOVED_TO), but will use the same watch
|
|
descriptor for the new directory as it did the old. This will leave all watch
|
|
dynamics intact, but new file events will fire and send the old base path to
|
|
the router. Explicitly re-watching the renamed directory (and any
|
|
subdirectories) will also return that existing watch descriptor. Thus, this
|
|
method can just be directly called for directory moves/renames, and WDs in the
|
|
``watchmap`` will just be used as expected. (One should also call the method
|
|
using the old lead and set ``remove=True`` to remove old tuples out of the
|
|
``watchmap``. Note that this doesn't remove the existing watches from iNotify,
|
|
just their tracked tuples.)
|
|
'''
|
|
if lead is None:
|
|
lead = ''
|
|
|
|
wds = []
|
|
origin = Path(path, lead)
|
|
for subdir in [origin, *util.path.iter_glob_paths('**/', origin)]:
|
|
lead = subdir.relative_to(Path(path))
|
|
wd = self._add_watch(path, flags, lead=lead, remove=remove)
|
|
if wd is not None:
|
|
wds.append(wd)
|
|
return wds
|
|
|
|
def listen(
|
|
self,
|
|
path,
|
|
flags=None,
|
|
):
|
|
'''
|
|
Listen to file events occurring under a provided path, optionally excluding those
|
|
not matching the provided iNotify flags.
|
|
|
|
Parameters:
|
|
path: Path (directory) to watch with ``inotify``
|
|
flags: inotify_simple flags matching FS event types allowed to trigger the
|
|
callback
|
|
'''
|
|
#path = Path(path)
|
|
path = str(path)
|
|
|
|
if flags is None:
|
|
flags = iflags.CREATE | iflags.DELETE | iflags.MODIFY | iflags.DELETE_SELF | iflags.MOVED_TO
|
|
|
|
# ensure flags can detect directory events
|
|
flags |= iflags.CREATE | iflags.MOVED_FROM | iflags.MOVED_TO
|
|
|
|
wds = self._add_watch_recursive(path, flags)
|
|
|
|
try:
|
|
self.canonmap[wds[0]] = (path, flags)
|
|
except IndexError:
|
|
logger.error(f'Path {path} returned no INotify watch descriptors')
|
|
raise
|
|
|
|
def run(self):
|
|
'''
|
|
Start the (blocking) iNotify event loop
|
|
|
|
Note: On usage
|
|
``start()`` is a blocking call. This will hog your main thread if not properly
|
|
threaded. If handling this manually in your outer context, you will also need
|
|
to make sure to call ``.stop()``
|
|
'''
|
|
self.started = True
|
|
logger.info(
|
|
color_text(
|
|
f'Starting listener for {len(self.watchmap)} paths',
|
|
Fore.GREEN,
|
|
)
|
|
)
|
|
|
|
for path, flags in self.canonmap.values():
|
|
logger.info(f'> Listening on path {path} for flags {iflags.from_mask(flags)}')
|
|
|
|
for (callback, pattern, debounce, delay, *_) in self.router.routemap[path]:
|
|
logger.info(
|
|
color_text(
|
|
f'| > {pattern} -> {callback.__name__} (debounce {debounce}ms, delay {delay}ms)',
|
|
Style.DIM,
|
|
)
|
|
)
|
|
|
|
while True:
|
|
rlist, _, _ = select.select(
|
|
[self.inotify.fileno(), self.read_fd], [], []
|
|
)
|
|
|
|
if self.inotify.fileno() in rlist:
|
|
events = self.inotify.read(timeout=0)
|
|
self.handle_events(events)
|
|
|
|
# check for written stop byte
|
|
if self.read_fd in rlist:
|
|
os.close(self.read_fd)
|
|
self.inotify.close()
|
|
return
|
|
|
|
def update_moved_from(self, path, lead):
|
|
'''
|
|
Update directories on ``MOVED_FROM`` events.
|
|
|
|
.. admonition:: Additional details
|
|
|
|
This method gets the existing WD, removes the old path associated with that WD
|
|
from the ``watchmap`` (preventing events originating from this old path when the
|
|
new path, which has the *same WD*, receives an inotify event), and queues the (WD,
|
|
base path) tuple to be matched later in a ``MOVED_TO`` handler.
|
|
|
|
This method isn't a part of a ``MOVED_TO`` handler because it may be called
|
|
without ever having a ``MOVED_TO`` that follows up. We respond right away in
|
|
``handle_events`` to ``MOVED_FROM`` events, keeping the ``watchmap`` in sync,
|
|
regardless of whether we can expect a ``MOVED_TO`` to sweep through after the
|
|
fact.
|
|
|
|
Note that the ``lead`` is unique for a given WD and base path. WDs are unique for
|
|
filepaths, but inotify uses the same WD for new directories when they experience a
|
|
rename (it's the same inode). However, during such a transition, the ``watchmap``
|
|
can see two different entries for the same WD and basepath: the old tracked path,
|
|
and the newly named one (again, still the same inode). So: this method can be
|
|
called 1) directly from ``MOVED_FROM`` events, preemptively wiping the old path
|
|
from the tracked dicts, or 2) during handling of a ``MOVED_TO`` event (in case we
|
|
don't allow ``MOVED_FROM`` events, for instance), given both the new and old paths
|
|
can be seen in the ``watchmap``.
|
|
'''
|
|
wd = self.pathmap.get(Path(path, lead))
|
|
logger.debug(f'> MOVED_FROM update, [{Path(path, lead)}] in pathmap as [{wd}]')
|
|
if wd is None: return
|
|
|
|
if self.watchmap[wd].pop((path, lead), None):
|
|
logger.debug(f'> MOVED_FROM update, popped from watchmap')
|
|
self.unmatched_move_froms[wd][path] = lead
|
|
|
|
def update_moved_to(self, path, lead):
|
|
'''
|
|
Construct synthetic MOVED events. Events are constructed from the path's WD. If
|
|
the provided path is not watched, an empty list of events is returned.
|
|
|
|
.. admonition:: Design details
|
|
|
|
This method is nuanced. It can only be called once a ``MOVED_TO`` occurs, since
|
|
we can't act on a ``MOVED_FROM`` (we don't know the new target location to look
|
|
so we can send file events). When called, we first look for the path's WD in
|
|
the ``pathmap``. We then check if this WD points to more than one entry with the
|
|
same base path (WDs are unique to the path; under the same WD, the same base
|
|
path implies the same lead). If so, we know one is the outdated path, and we
|
|
push the outdated lead to ``update_moved_from``. This would be evidence that the
|
|
``MOVED_FROM`` event for the move operation wasn't handled in the main event
|
|
handling loop. We then check for unmatched move-froms, which should provide
|
|
any renamed directories, regardless of whether ``MOVED_FROMs`` were allowed, to
|
|
be detected. Finally, the appropriate ``MOVED_FROMs`` and ``MOVED_TOs`` are
|
|
handled. To ensure only the correct events match upon handling, we do the
|
|
following:
|
|
|
|
- First, if a ``MOVED_FROM`` path is not available, we assume it wasn't queued
|
|
by the event and not a watched flag. Given we by default ensure MOVED events
|
|
are tracked, regardless of listened paths, this shouldn't be possible, but
|
|
if this standard were to change, we won't recursively respond to
|
|
``MOVED_FROMs``. This will mean that we can't prevent events from being
|
|
matched to old directory names (we've rooted out the ability to tell when
|
|
they've changed), and thus can't remove them from the ``watchpath``
|
|
accordingly. (One functional caveat here: this MOVED_TO handling method
|
|
explicitly calls ``updated_moved_from``, which should clean up lingering
|
|
renamed path targets. This happens recursively if we're watching MOVED_TOs,
|
|
so even if standards do change and you don't watch ``MOVED_FROMs``, you'll
|
|
still get clean up for free due to the robustness of this method.
|
|
- If a ``MOVED_FROM`` lead is found, either due to an inferred matching base
|
|
lingering in the ``watchmap`` or through previously handled ``MOVED_FROM``
|
|
response, add this path/lead back to the ``watchmap``, remove the new
|
|
path/lead, and call ``handle_events`` for the synthetic ``MOVED_FROM`` events
|
|
across files and directories. Once finished, again remove the old path/lead
|
|
and add back the new one.
|
|
- Submit ``MOVED_TO`` events to ``handle_events``. This will recursively propagate for
|
|
subdirectories, each submitting their own ``update_moved_to`` call, resetting
|
|
its own outdated leads and changing them back, all the way down to the
|
|
bottom.
|
|
|
|
In the odd case where ``MOVED_FROM`` is registered but not ``MOVED_TO``, you will
|
|
simply remove the directory causing a ``MOVED_FROM`` event, with no recursive
|
|
propagation. This should likely be changed.
|
|
'''
|
|
fullpath = Path(path, lead)
|
|
|
|
wd = self.pathmap.get(fullpath)
|
|
if wd is None:
|
|
logger.debug(f'Directory [{fullpath}] moved, but is not watched, ignoring')
|
|
return []
|
|
|
|
# inspect registered paths with same WD -- looking for same base path but diff lead
|
|
# will be empty if explicitly handled by a MOVED_FROM -- else inferred from watchmap
|
|
matching_bases = [pl for pl in self.watchmap[wd] if pl[0] == path and pl[1] != lead]
|
|
|
|
# there should be at most one of these, but handle iteratively
|
|
for matching_base, old_lead in matching_bases:
|
|
self.update_moved_from(matching_base, old_lead)
|
|
|
|
# explicit queries for files & dirs faster (tested) than filtering a single query
|
|
# using ``Path.is_dir``; handle files, then subdirectories
|
|
moved_from_events = []
|
|
moved_to_events = []
|
|
for file in util.path.iter_glob_paths('*', fullpath, no_dir=True):
|
|
moved_from_events.append(iEvent(wd=wd, mask=iflags.MOVED_FROM, cookie=0, name=file.name))
|
|
moved_to_events.append(iEvent(wd=wd, mask=iflags.MOVED_TO, cookie=0, name=file.name))
|
|
|
|
for subdir in util.path.iter_glob_paths('*/', fullpath):
|
|
moved_from_mask = iflags.MOVED_FROM | iflags.ISDIR
|
|
moved_from_events.append(iEvent(wd=wd, mask=moved_from_mask, cookie=0, name=subdir.name))
|
|
|
|
moved_to_mask = iflags.MOVED_TO | iflags.ISDIR
|
|
moved_to_events.append(iEvent(wd=wd, mask=moved_to_mask, cookie=0, name=subdir.name))
|
|
|
|
# check for unmatched moved froms -- should be enqueued in event loop or just above
|
|
moved_from_lead = self.unmatched_move_froms.get(wd, {}).pop(path, None)
|
|
if moved_from_lead is None:
|
|
logger.debug(f'Couldn\'t find MOVED_FROM origin, just yielding MOVED_TO events')
|
|
else:
|
|
# temporarily remove new path, add old path to allow MOVED_FROMs to seep through
|
|
flags = self.watchmap[wd].pop((path, lead)) # remove new
|
|
self.watchmap[wd][(path, moved_from_lead)] = flags # add old
|
|
self.handle_events(moved_from_events)
|
|
|
|
self.watchmap[wd].pop((path, moved_from_lead)) # remove old
|
|
self.watchmap[wd][(path, lead)] = flags # add back new
|
|
|
|
self.handle_events(moved_to_events)
|
|
|
|
def handle_events(self, events):
|
|
'''
|
|
Note:
|
|
If ``handle_events`` is called externally, note that this loop will block in the
|
|
calling thread until the jobs have been submitted. It will *not* block until
|
|
jobs have completed, however, as a list of futures is returned. The calling
|
|
Watcher instance may have already been started, in which case ``run()`` will
|
|
already be executing in a separate thread. Calling this method externally will
|
|
not interfere with this loop insofar as it adds jobs to the same thread pool.
|
|
|
|
Because this method only submits jobs associated with the provided ``events``,
|
|
the calling thread can await the returned list of futures and be confident
|
|
that top-level callbacks associated with these file events have completed. Do
|
|
note that, if the Watcher has already been started, any propagating file
|
|
events will be picked up and possibly processed simultaneously (although their
|
|
associated callbacks will have nothing to do with the returned list of futures).
|
|
'''
|
|
from execlog.router import Event
|
|
|
|
for event in events:
|
|
# hard coded ignores
|
|
if util.path.glob_match(event.name, util.path.IGNORE_PATTERNS): continue
|
|
|
|
mask_flags = iflags.from_mask(event.mask)
|
|
|
|
if event.wd not in self.watchmap:
|
|
raise ValueError(f'Watcher fired for untracked descriptor origin: {event}')
|
|
|
|
moved_froms = []
|
|
moved_tos = []
|
|
for (path, lead), flags in self.watchmap[event.wd].items():
|
|
relpath = Path(lead, event.name)
|
|
abspath = Path(path, relpath)
|
|
|
|
# add new directories
|
|
if iflags.ISDIR in mask_flags:
|
|
if iflags.CREATE in mask_flags:
|
|
logger.debug(f'New directory detected [{relpath}]')
|
|
self._add_watch_recursive(path, flags, lead=relpath)
|
|
|
|
if iflags.MOVED_FROM in mask_flags:
|
|
moved_froms.append((path, relpath))
|
|
|
|
if iflags.MOVED_TO in mask_flags:
|
|
moved_tos.append((path, relpath))
|
|
|
|
continue
|
|
|
|
logger.debug(f'Watcher fired for [{relpath}]: {mask_flags}')
|
|
|
|
route_event = Event(endpoint=str(path), name=str(relpath), action=mask_flags)
|
|
self.router.submit(route_event)
|
|
|
|
# handle renamed directories; old dir was being watched if these flags
|
|
# match. The same WD is used by iNotify for the new dir, so
|
|
# recursively update explicitly stored paths.
|
|
for path, lead in moved_froms:
|
|
logger.debug(f'Directory moved, removing old [{lead}]')
|
|
self.update_moved_from(path, lead)
|
|
|
|
for path, lead in moved_tos:
|
|
logger.debug(f'Directory moved, adding new [{lead}]')
|
|
self._add_watch(path, flags, lead=lead)
|
|
self.update_moved_to(path, lead)
|
|
|
|
def stop(self):
|
|
'''
|
|
Shutdown active listener processes, including the attached router thread pool and
|
|
the iNotify event loop.
|
|
|
|
Note:
|
|
Shutting down the thread pool will wait until pending futures are finished
|
|
executing before actually returning. A common source of error is having the
|
|
main process exit before final tasks can be submitted, resulting in
|
|
RuntimeErrors that cannot "schedule new futures after interpreter shutdown."
|
|
So you either need to ensure the final tasks are scheduled before calling
|
|
``stop()`` (this means more than just a ``submit()`` call; it must have actually
|
|
propagated through to ``submit_callback`` and reached ``thread_pool.submit``) to
|
|
allow them to be handled automatically prior to shutdown, or manually wait on
|
|
their futures to complete. Otherwise, thread pool shutdown will occur, and
|
|
they'll still be making their way out of the queue only to reach the
|
|
``thread_pool.submit`` after it's had its final boarding call.
|
|
'''
|
|
logger.info("Stopping listener...")
|
|
|
|
# request INotify stop by writing in the pipe, checked in watch loop
|
|
if not self.write.closed:
|
|
self.write.write(b"\x00")
|
|
self.write.close()
|
|
|
|
if self.router.thread_pool is not None:
|
|
self.router.thread_pool.shutdown()
|
|
|