diff --git a/co3/__init__.py b/co3/__init__.py index d866f01..5792eda 100644 --- a/co3/__init__.py +++ b/co3/__init__.py @@ -101,19 +101,23 @@ their SQLite FTS counterparts. from co3.accessor import Accessor from co3.co3 import CO3, collate from co3.collector import Collector +from co3.component import Component #from co3.composer import Composer from co3.database import Database +from co3.engine import Engine from co3.indexer import Indexer from co3.manager import Manager from co3.mapper import Mapper, ComposableMapper -from co3.component import Component from co3.schema import Schema -from co3.engine import Engine +from co3.resource import Resource, SelectableResource +from co3.differ import Differ +from co3.syncer import Syncer -from co3 import accessors -from co3 import databases -from co3 import managers -from co3 import components +from co3 import util from co3 import schemas from co3 import engines -from co3 import util +from co3 import managers +from co3 import accessors +from co3 import databases +from co3 import resources +from co3 import components diff --git a/co3/database.py b/co3/database.py index 0bbd887..0287596 100644 --- a/co3/database.py +++ b/co3/database.py @@ -53,14 +53,13 @@ might otherwise be handled explicitly, as seen above. above) and group up operations as they please, reducing overhead. The Database then wraps up a few single-operation contexts where outer connection control is not needed. ''' - import logging -from co3.accessor import Accessor -from co3.manager import Manager -from co3.indexer import Indexer from co3.engine import Engine from co3.schema import Schema +from co3.manager import Manager +from co3.indexer import Indexer +from co3.accessor import Accessor logger = logging.getLogger(__name__) diff --git a/co3/differ.py b/co3/differ.py new file mode 100644 index 0000000..7fa3f40 --- /dev/null +++ b/co3/differ.py @@ -0,0 +1,58 @@ +from typing import Any +from collections import defaultdict +from abc import ABCMeta, abstractmethod + +from co3.util.types import Equatable +from co3.resource import SelectableResource + + +class Differ[E: Equatable](metaclass=ABCMeta): + ''' + Compute diff sets (asymmetric exclusives and intersection) among ``Equatable`` + transformations of results from ``SelectableResources``. + ''' + def __init__( + self, + l_resource: SelectableResource, + r_resource: SelectableResource, + ): + self.l_resource = l_resource + self.r_resource = r_resource + + @abstractmethod + def l_transform(self, item) -> E: + ''' + Transform items from the left resource to the joint comparison space, i.e., an + instance of type ``Equatable`` + ''' + raise NotImplementedError + + @abstractmethod + def r_transform(self, item) -> E: + raise NotImplementedError + + def diff( + self, + l_select_kwargs: dict, + r_select_kwargs: dict, + ) -> tuple[dict[E, Any], dict[E, Any], dict[E, Any]]: + l_items = self.l_resource.select(**l_select_kwargs) + r_items = self.r_resource.select(**r_select_kwargs) + + l_map: dict[E, list[Any]] = defaultdict(list) + r_map: dict[E, list[Any]] = defaultdict(list) + + for item in l_items: + l_map[self.l_transform(item)].append(item) + + for item in r_items: + r_map[self.r_transform(item)].append(item) + + l_set: set[E] = set(l_map) + r_set: set[E] = set(r_map) + + l_excl = { l:l_map[l] for l in l_set - r_set } + r_excl = { r:r_map[r] for r in r_set - l_set } + lr_int = { i:(l_map[i], r_map[i]) for i in l_set & r_set } + + return l_excl, r_excl, lr_int #, (l_map, r_map) diff --git a/co3/resource.py b/co3/resource.py new file mode 100644 index 0000000..290aff0 --- /dev/null +++ b/co3/resource.py @@ -0,0 +1,9 @@ +from typing import Protocol + + +class Resource: + pass + +class SelectableResource(Protocol): + def select(self, component, *args, **kwargs): + raise NotImplementedError diff --git a/co3/resources/__init__.py b/co3/resources/__init__.py new file mode 100644 index 0000000..b7ebad3 --- /dev/null +++ b/co3/resources/__init__.py @@ -0,0 +1 @@ +from co3.resources.disk import DiskResource diff --git a/co3/resources/disk.py b/co3/resources/disk.py new file mode 100644 index 0000000..556b46e --- /dev/null +++ b/co3/resources/disk.py @@ -0,0 +1,30 @@ +from pathlib import Path + +from co3.util import paths +from co3.resource import SelectableResource + + +class DiskResource(SelectableResource): + def select( + self, + path_list: str | Path | list[str | Path], + glob: str | None = None + ) -> list[Path]: + iter_path_kwargs = {'relative': True, 'no_dir': True} + + if type(path_list) is not list: + path_list = [path_list] + + path_agg = set() + for path in path_list: + path_union = set() + + if glob is None: + path_union = set(paths.iter_nested_paths(path, **iter_path_kwargs)) + else: + path_union = set(paths.iter_glob_paths(glob, path, **iter_path_kwargs)) + + path_agg.union(( (path, head) for head in path_union )) + + return path_agg + diff --git a/co3/syncer.py b/co3/syncer.py new file mode 100644 index 0000000..9f9afe4 --- /dev/null +++ b/co3/syncer.py @@ -0,0 +1,233 @@ +import random +import logging +from collections import defaultdict + +from co3.differ import Differ +from co3.util.types import Equatable + + +logger = logging.getLogger(__name__) + +class Syncer[E: Equatable]: + def __init__(self, differ: Differ[E]): + self.differ = differ + + def handle_l_excl(self, key: E, val: list): + return key + + def handle_r_excl(self, key: E, val: list): + return key + + def handle_lr_int(self, key: E, val: tuple[list, list]): + return key + + def filter_diff_sets(self, l_excl, r_excl, lr_int): + return l_excl, r_excl, lr_int + + def process_chunk(self, handler_results): + return handler_results + + def _handle_chunk_items(self, membership_items): + results = [] + for membership, item in membership_items: + if membership == 0b10: + res = self.handle_l_excl(*item) + elif membership == 0b01: + res = self.handle_r_excl(*item) + elif membership == 0b11: + res = self.handle_lr_int(*item) + else: + logger.debug('Incorrect membership for chunk item, skipping') + continue + + results.append(res) + + return results + + def sync( + self, + l_select_kwargs: dict, + r_select_kwargs: dict, + ) -> list: + return self.chunked_sync(l_select_kwargs, r_select_kwargs) + + def chunked_sync( + self, + l_select_kwargs : dict, + r_select_kwargs : dict, + chunk_time : int | None = None, + item_limit : int | None = None, + chunk_cap : int | None = None, + ) -> list: + ''' + Sync diff sets through attached handlers in chunks. + + Chunks sizes are determined by a *desired processing duration*, i.e., how long + should be spent aggregating items in handlers (``handle_*`` methods) and the + subsequent call to ``process_chunk``. This is particularly useful for database + driven interactions, where one needs wants to embrace bulk insertion (saving on + repetitive, costly I/O-bound tasks) while performing intermittent consistency + checks ("saving progress" along the way by inserting in batches; one could imagine + otherwise performing a huge amount of computation only to encounter an error + interacting with the database and subsequently rollback the transaction, losing + all results). + + Chunk *times* are provided (rather than directly specifying sizes) due to the + variable nature of handlers and the items they need to process. For example, if + handlers prepare events for bulk submission to an ``execlog.Router``, it's + difficult to estimate how long the resulting execution traces will take to + complete. A regular file may take 0.01s to finish preparing, whereas an audio file + may kick off a 5 minute transcription job. Targeting the aggregate chunk + processing time allows us to dynamically readjust batch sizes as more jobs + complete. + + A few extra technical remarks: + + - We randomly shuffle the input items to even out any localized bias present in + the order of items to be processed. This helps facilitate a more stable estimate + of the average chunk duration and promote ergodicity ("typical" batch more + representative of the "average" batch). + - We employ an exponential moving average over times of completed chunks, more + heavily weighting the durations of recently completed chunks when readjusting + sizes. This is not because recent jobs reveal more about the average job length + (we randomize these as mentioned above), but instead to reflect short-term + changes in stationarity due to variable factors like resource allocation. For + example, we'd like to quickly reflect the state of the CPU's capacity after the + start/end of external processing tasks so we might more accurately estimate + batch sizes. + + Note: + Could be dangerous if going from fast file processing to note processing. + Imagine estimating 1000 iter/sec, then transferring that to the next batch + when it's more like 0.2 iter/sec. We would lose any chunking. (Luckily, in + practice, turns out that notes are almost always processed before the huge set + of nested files lurking and we don't experience this issue.) + + + .. admonition:: Sync strategy + + 1. Compute diffs under the provided Differ between supported selection sets + for its SelectableResources. + 2. Perform any additional filtering of the diff sets with + ``filter_diff_sets``, producing the final set triplet. + 3. Begin a chunked processing loop for all items involved in the final diff + sets. Items exclusive to the left resource are passed to ``handle_l_excl``, + ``handle_r_excl`` for those exclusive to the right resource, and + ``handle_lr_int`` for the left-right intersection. Note how this mechanism + should inform the implementation of these methods for inheriting + subclasses: items are only handled once they are part of an actively + processed chunk. We don't first globally process the sets and "clean up" + chunk-by-chunk, as this is less stable and can waste compute if we need to + exit early. + + Parameters: + chunk_time: desired processing time per batch, in seconds + item_limit: number of items to sync before exiting + chunk_cap: upper limit on the number of items per chunk. Even if never + reached, setting this can help prevent anomalous, unbounded time + estimates that yield prohibitively sized chunks. + ''' + # calculate diff and filter diff sets + l_excl, r_excl, lr_int = self.filter_diff_sets( + *self.differ.diff( + l_select_kwargs, + r_select_kwargs, + ) + ) + + # group items by "intuitive" identifiers + items = [] + items.extend([(0b10, l) for l in l_excl.items()]) + items.extend([(0b01, r) for r in r_excl.items()]) + items.extend([(0b11, i) for i in lr_int.items()]) + + # check for empty items + if not items: + logger.info('Sync has nothing to do, exiting') + return [] + + # mix items for ergodicity + random.shuffle(items) + + # if item limit not set, set to all items + if item_limit is None: + item_limit = len(items) + + # if chunk cap not set, allow it to be the item limit + if chunk_cap is None: + chunk_cap = item_limit + + # chunk cap may be large, but max it out at the item limit + chunk_cap = max(chunk_cap, 1) # ensure >= 1 to avoid infinite loop + chunk_cap = min(chunk_cap, item_limit) + + # if chunk time not set, set to the largest value: the chunk cap + if chunk_time is None: + chunk_size = chunk_cap + else: + # otherwise, assume 1s per item up to the cap size + chunk_size = min(chunk_time, chunk_cap) + + # chunk loop variables + chunk_timer = 0 + chunk_report = defaultdict(int) + chunk_results = [] + + # bar variable tracking remaining items + remaining = item_limit + pbar = tqdm( + desc=f'Adaptive chunked sync [limit {item_limit}]', + total=remaining, + ) + + with pbar as _: + while remaining > 0: + time_pcnt = chunk_time_used / max(chunk_time, 1) * 100 + pbar.set_description( + f'Adaptive chunked sync [size {chunk_size} (max {chunk_cap})] ' + f'[prev chunk {chunk_time_used:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]' + ) + + # time the handler & processing sequence + chunk_time_start = time.time() + + start_idx = limit - remaining + chunk_items = items[start_idx:start_idx+chunk_size] + handler_results = self._handle_chunk_items(chunk_items) + results.extend(self.process_chunk(handler_results)) + + chunk_timer = time.time() - chunk_time_start + + # populate the aggregate chunk report + chunk_report['size'] += chunk_size + chunk_report['timer'] += chunk_timer + chunk_report['count'] += 1 + + # remove the number of processed items from those remaining + remaining -= chunk_size + pbar.update(n=chunk_size) + + # re-calculate the chunk size with a simple EMA + s_per_item = chunk_timer / chunk_size + new_target = int(chunk_time / s_per_item) + chunk_size = 0.5*new_target + 0.5*chunk_size + + # apply the chunk cap and clip by remaining items + chunk_size = min(min(chunk_size, chunk_cap), remaining) + + # ensure chunk size is >= 1 to prevent loop stalling + chunk_size = max(chunk_size, 1) + + avg_chunk_size = chunk_report['size'] / chunk_report['count'] + avg_chunk_time = chunk_report['time'] / chunk_report['count'] + avg_time_match = avg_chunk_time / chunk_5ime + + logger.info(f'Sync report: ') + logger.info(f'-> Total chunks : {chunk_report["count"]} ') + logger.info(f'-> Total items processed : {chunk_report["size"]} / {limit} ') + logger.info(f'-> Total time spent : {chunk_report["timer"]:.2f}s ') + logger.info(f'-> Average chunk size : {avg_chunk_size:.2f} ') + logger.info(f'-> Average time/chunk : {avg_chunk_time:.2f}s / {chunk_time}s') + logger.info(f'-> Average time match : {avg_time_match:.2f}% ') + + return chunk_results diff --git a/co3/util/__init__.py b/co3/util/__init__.py index da83426..4edd437 100644 --- a/co3/util/__init__.py +++ b/co3/util/__init__.py @@ -1,3 +1,4 @@ from co3.util import db from co3.util import regex from co3.util import types +from co3.util import paths diff --git a/co3/util/paths.py b/co3/util/paths.py new file mode 100644 index 0000000..f3dbebe --- /dev/null +++ b/co3/util/paths.py @@ -0,0 +1,40 @@ +from pathlib import Path + +from wcmatch import glob as wc_glob + + +def iter_nested_paths(path: Path, ext: str = None, no_dir=False, relative=False): + if ext is None: ext = '' + return iter_glob_paths(f'**/!(.*|*.tmp|*~)*{ext}', path, no_dir=no_dir, relative=relative) + +def iter_glob_paths(_glob, path: Path, no_dir=False, relative=False): + ''' + wc_glob should ignore hidden files and directories by default; `**` only matches + non-hidden directories/files, `*` only non-hidden files. + + Note: Pattern quirks + - Under `wc_glob`, `**` (when GLOBSTAR enabled) will match all files and + directories, recursively. Contrast this with `Path.rglob('**')`, which matches + just directories. For `wcmatch`, use `**/` to recover the directory-only + behavior. + + Note: pattern behavior + - `*`: all files and dirs, non-recursive + - `**`: all files and dirs, recursive + - `*/`: all dirs, non-recursive + - `**/`: all dirs, recursive + - All file (no dir) equivalents: either of the first two, with `no_dir=True` + ''' + flags = wc_glob.GLOBSTAR | wc_glob.EXTGLOB | wc_glob.NEGATE + if no_dir: + flags |= wc_glob.NODIR + + glob_list = list(map( + Path, + wc_glob.glob(_glob, root_dir=path, flags=flags) + )) + + if not relative: + glob_list = [Path(path, p) for p in glob_list] + + return glob_list diff --git a/co3/util/types.py b/co3/util/types.py index 7fe2b4a..d9000b7 100644 --- a/co3/util/types.py +++ b/co3/util/types.py @@ -1,5 +1,6 @@ -from typing import TypeVar +from abc import abstractmethod from collections import namedtuple +from typing import Protocol, TypeVar from dataclasses import is_dataclass, asdict import sqlalchemy as sa @@ -7,6 +8,12 @@ import sqlalchemy as sa # custom types SQLTableLike = TypeVar('SQLTableLike', bound=sa.Table | sa.Subquery | sa.Join) +class Equatable(Protocol): + """Protocol for annotating comparable types.""" + + @abstractmethod + def __eq__(self, other: 'Equatable') -> bool: + pass # type checking/conversion methods def is_dataclass_instance(obj) -> bool: diff --git a/requirements.txt b/requirements.txt index a4e9d6b..ae514bc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,5 +7,6 @@ sphinx-autodoc-typehints sqlalchemy numpy +wcmatch pytest