implement generalized Differ and Syncer classes, add general Resource type

This commit is contained in:
Sam G. 2024-05-11 03:09:07 -07:00
parent 1ed4722e77
commit ee6c1ebec5
11 changed files with 395 additions and 12 deletions

View File

@ -101,19 +101,23 @@ their SQLite FTS counterparts.
from co3.accessor import Accessor from co3.accessor import Accessor
from co3.co3 import CO3, collate from co3.co3 import CO3, collate
from co3.collector import Collector from co3.collector import Collector
from co3.component import Component
#from co3.composer import Composer #from co3.composer import Composer
from co3.database import Database from co3.database import Database
from co3.engine import Engine
from co3.indexer import Indexer from co3.indexer import Indexer
from co3.manager import Manager from co3.manager import Manager
from co3.mapper import Mapper, ComposableMapper from co3.mapper import Mapper, ComposableMapper
from co3.component import Component
from co3.schema import Schema from co3.schema import Schema
from co3.engine import Engine from co3.resource import Resource, SelectableResource
from co3.differ import Differ
from co3.syncer import Syncer
from co3 import accessors from co3 import util
from co3 import databases
from co3 import managers
from co3 import components
from co3 import schemas from co3 import schemas
from co3 import engines from co3 import engines
from co3 import util from co3 import managers
from co3 import accessors
from co3 import databases
from co3 import resources
from co3 import components

View File

@ -53,14 +53,13 @@ might otherwise be handled explicitly, as seen above.
above) and group up operations as they please, reducing overhead. The Database then above) and group up operations as they please, reducing overhead. The Database then
wraps up a few single-operation contexts where outer connection control is not needed. wraps up a few single-operation contexts where outer connection control is not needed.
''' '''
import logging import logging
from co3.accessor import Accessor
from co3.manager import Manager
from co3.indexer import Indexer
from co3.engine import Engine from co3.engine import Engine
from co3.schema import Schema from co3.schema import Schema
from co3.manager import Manager
from co3.indexer import Indexer
from co3.accessor import Accessor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

58
co3/differ.py Normal file
View File

@ -0,0 +1,58 @@
from typing import Any
from collections import defaultdict
from abc import ABCMeta, abstractmethod
from co3.util.types import Equatable
from co3.resource import SelectableResource
class Differ[E: Equatable](metaclass=ABCMeta):
'''
Compute diff sets (asymmetric exclusives and intersection) among ``Equatable``
transformations of results from ``SelectableResources``.
'''
def __init__(
self,
l_resource: SelectableResource,
r_resource: SelectableResource,
):
self.l_resource = l_resource
self.r_resource = r_resource
@abstractmethod
def l_transform(self, item) -> E:
'''
Transform items from the left resource to the joint comparison space, i.e., an
instance of type ``Equatable``
'''
raise NotImplementedError
@abstractmethod
def r_transform(self, item) -> E:
raise NotImplementedError
def diff(
self,
l_select_kwargs: dict,
r_select_kwargs: dict,
) -> tuple[dict[E, Any], dict[E, Any], dict[E, Any]]:
l_items = self.l_resource.select(**l_select_kwargs)
r_items = self.r_resource.select(**r_select_kwargs)
l_map: dict[E, list[Any]] = defaultdict(list)
r_map: dict[E, list[Any]] = defaultdict(list)
for item in l_items:
l_map[self.l_transform(item)].append(item)
for item in r_items:
r_map[self.r_transform(item)].append(item)
l_set: set[E] = set(l_map)
r_set: set[E] = set(r_map)
l_excl = { l:l_map[l] for l in l_set - r_set }
r_excl = { r:r_map[r] for r in r_set - l_set }
lr_int = { i:(l_map[i], r_map[i]) for i in l_set & r_set }
return l_excl, r_excl, lr_int #, (l_map, r_map)

9
co3/resource.py Normal file
View File

@ -0,0 +1,9 @@
from typing import Protocol
class Resource:
pass
class SelectableResource(Protocol):
def select(self, component, *args, **kwargs):
raise NotImplementedError

View File

@ -0,0 +1 @@
from co3.resources.disk import DiskResource

30
co3/resources/disk.py Normal file
View File

@ -0,0 +1,30 @@
from pathlib import Path
from co3.util import paths
from co3.resource import SelectableResource
class DiskResource(SelectableResource):
def select(
self,
path_list: str | Path | list[str | Path],
glob: str | None = None
) -> list[Path]:
iter_path_kwargs = {'relative': True, 'no_dir': True}
if type(path_list) is not list:
path_list = [path_list]
path_agg = set()
for path in path_list:
path_union = set()
if glob is None:
path_union = set(paths.iter_nested_paths(path, **iter_path_kwargs))
else:
path_union = set(paths.iter_glob_paths(glob, path, **iter_path_kwargs))
path_agg.union(( (path, head) for head in path_union ))
return path_agg

233
co3/syncer.py Normal file
View File

@ -0,0 +1,233 @@
import random
import logging
from collections import defaultdict
from co3.differ import Differ
from co3.util.types import Equatable
logger = logging.getLogger(__name__)
class Syncer[E: Equatable]:
def __init__(self, differ: Differ[E]):
self.differ = differ
def handle_l_excl(self, key: E, val: list):
return key
def handle_r_excl(self, key: E, val: list):
return key
def handle_lr_int(self, key: E, val: tuple[list, list]):
return key
def filter_diff_sets(self, l_excl, r_excl, lr_int):
return l_excl, r_excl, lr_int
def process_chunk(self, handler_results):
return handler_results
def _handle_chunk_items(self, membership_items):
results = []
for membership, item in membership_items:
if membership == 0b10:
res = self.handle_l_excl(*item)
elif membership == 0b01:
res = self.handle_r_excl(*item)
elif membership == 0b11:
res = self.handle_lr_int(*item)
else:
logger.debug('Incorrect membership for chunk item, skipping')
continue
results.append(res)
return results
def sync(
self,
l_select_kwargs: dict,
r_select_kwargs: dict,
) -> list:
return self.chunked_sync(l_select_kwargs, r_select_kwargs)
def chunked_sync(
self,
l_select_kwargs : dict,
r_select_kwargs : dict,
chunk_time : int | None = None,
item_limit : int | None = None,
chunk_cap : int | None = None,
) -> list:
'''
Sync diff sets through attached handlers in chunks.
Chunks sizes are determined by a *desired processing duration*, i.e., how long
should be spent aggregating items in handlers (``handle_*`` methods) and the
subsequent call to ``process_chunk``. This is particularly useful for database
driven interactions, where one needs wants to embrace bulk insertion (saving on
repetitive, costly I/O-bound tasks) while performing intermittent consistency
checks ("saving progress" along the way by inserting in batches; one could imagine
otherwise performing a huge amount of computation only to encounter an error
interacting with the database and subsequently rollback the transaction, losing
all results).
Chunk *times* are provided (rather than directly specifying sizes) due to the
variable nature of handlers and the items they need to process. For example, if
handlers prepare events for bulk submission to an ``execlog.Router``, it's
difficult to estimate how long the resulting execution traces will take to
complete. A regular file may take 0.01s to finish preparing, whereas an audio file
may kick off a 5 minute transcription job. Targeting the aggregate chunk
processing time allows us to dynamically readjust batch sizes as more jobs
complete.
A few extra technical remarks:
- We randomly shuffle the input items to even out any localized bias present in
the order of items to be processed. This helps facilitate a more stable estimate
of the average chunk duration and promote ergodicity ("typical" batch more
representative of the "average" batch).
- We employ an exponential moving average over times of completed chunks, more
heavily weighting the durations of recently completed chunks when readjusting
sizes. This is not because recent jobs reveal more about the average job length
(we randomize these as mentioned above), but instead to reflect short-term
changes in stationarity due to variable factors like resource allocation. For
example, we'd like to quickly reflect the state of the CPU's capacity after the
start/end of external processing tasks so we might more accurately estimate
batch sizes.
Note:
Could be dangerous if going from fast file processing to note processing.
Imagine estimating 1000 iter/sec, then transferring that to the next batch
when it's more like 0.2 iter/sec. We would lose any chunking. (Luckily, in
practice, turns out that notes are almost always processed before the huge set
of nested files lurking and we don't experience this issue.)
.. admonition:: Sync strategy
1. Compute diffs under the provided Differ between supported selection sets
for its SelectableResources.
2. Perform any additional filtering of the diff sets with
``filter_diff_sets``, producing the final set triplet.
3. Begin a chunked processing loop for all items involved in the final diff
sets. Items exclusive to the left resource are passed to ``handle_l_excl``,
``handle_r_excl`` for those exclusive to the right resource, and
``handle_lr_int`` for the left-right intersection. Note how this mechanism
should inform the implementation of these methods for inheriting
subclasses: items are only handled once they are part of an actively
processed chunk. We don't first globally process the sets and "clean up"
chunk-by-chunk, as this is less stable and can waste compute if we need to
exit early.
Parameters:
chunk_time: desired processing time per batch, in seconds
item_limit: number of items to sync before exiting
chunk_cap: upper limit on the number of items per chunk. Even if never
reached, setting this can help prevent anomalous, unbounded time
estimates that yield prohibitively sized chunks.
'''
# calculate diff and filter diff sets
l_excl, r_excl, lr_int = self.filter_diff_sets(
*self.differ.diff(
l_select_kwargs,
r_select_kwargs,
)
)
# group items by "intuitive" identifiers
items = []
items.extend([(0b10, l) for l in l_excl.items()])
items.extend([(0b01, r) for r in r_excl.items()])
items.extend([(0b11, i) for i in lr_int.items()])
# check for empty items
if not items:
logger.info('Sync has nothing to do, exiting')
return []
# mix items for ergodicity
random.shuffle(items)
# if item limit not set, set to all items
if item_limit is None:
item_limit = len(items)
# if chunk cap not set, allow it to be the item limit
if chunk_cap is None:
chunk_cap = item_limit
# chunk cap may be large, but max it out at the item limit
chunk_cap = max(chunk_cap, 1) # ensure >= 1 to avoid infinite loop
chunk_cap = min(chunk_cap, item_limit)
# if chunk time not set, set to the largest value: the chunk cap
if chunk_time is None:
chunk_size = chunk_cap
else:
# otherwise, assume 1s per item up to the cap size
chunk_size = min(chunk_time, chunk_cap)
# chunk loop variables
chunk_timer = 0
chunk_report = defaultdict(int)
chunk_results = []
# bar variable tracking remaining items
remaining = item_limit
pbar = tqdm(
desc=f'Adaptive chunked sync [limit {item_limit}]',
total=remaining,
)
with pbar as _:
while remaining > 0:
time_pcnt = chunk_time_used / max(chunk_time, 1) * 100
pbar.set_description(
f'Adaptive chunked sync [size {chunk_size} (max {chunk_cap})] '
f'[prev chunk {chunk_time_used:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]'
)
# time the handler & processing sequence
chunk_time_start = time.time()
start_idx = limit - remaining
chunk_items = items[start_idx:start_idx+chunk_size]
handler_results = self._handle_chunk_items(chunk_items)
results.extend(self.process_chunk(handler_results))
chunk_timer = time.time() - chunk_time_start
# populate the aggregate chunk report
chunk_report['size'] += chunk_size
chunk_report['timer'] += chunk_timer
chunk_report['count'] += 1
# remove the number of processed items from those remaining
remaining -= chunk_size
pbar.update(n=chunk_size)
# re-calculate the chunk size with a simple EMA
s_per_item = chunk_timer / chunk_size
new_target = int(chunk_time / s_per_item)
chunk_size = 0.5*new_target + 0.5*chunk_size
# apply the chunk cap and clip by remaining items
chunk_size = min(min(chunk_size, chunk_cap), remaining)
# ensure chunk size is >= 1 to prevent loop stalling
chunk_size = max(chunk_size, 1)
avg_chunk_size = chunk_report['size'] / chunk_report['count']
avg_chunk_time = chunk_report['time'] / chunk_report['count']
avg_time_match = avg_chunk_time / chunk_5ime
logger.info(f'Sync report: ')
logger.info(f'-> Total chunks : {chunk_report["count"]} ')
logger.info(f'-> Total items processed : {chunk_report["size"]} / {limit} ')
logger.info(f'-> Total time spent : {chunk_report["timer"]:.2f}s ')
logger.info(f'-> Average chunk size : {avg_chunk_size:.2f} ')
logger.info(f'-> Average time/chunk : {avg_chunk_time:.2f}s / {chunk_time}s')
logger.info(f'-> Average time match : {avg_time_match:.2f}% ')
return chunk_results

View File

@ -1,3 +1,4 @@
from co3.util import db from co3.util import db
from co3.util import regex from co3.util import regex
from co3.util import types from co3.util import types
from co3.util import paths

40
co3/util/paths.py Normal file
View File

@ -0,0 +1,40 @@
from pathlib import Path
from wcmatch import glob as wc_glob
def iter_nested_paths(path: Path, ext: str = None, no_dir=False, relative=False):
if ext is None: ext = ''
return iter_glob_paths(f'**/!(.*|*.tmp|*~)*{ext}', path, no_dir=no_dir, relative=relative)
def iter_glob_paths(_glob, path: Path, no_dir=False, relative=False):
'''
wc_glob should ignore hidden files and directories by default; `**` only matches
non-hidden directories/files, `*` only non-hidden files.
Note: Pattern quirks
- Under `wc_glob`, `**` (when GLOBSTAR enabled) will match all files and
directories, recursively. Contrast this with `Path.rglob('**')`, which matches
just directories. For `wcmatch`, use `**/` to recover the directory-only
behavior.
Note: pattern behavior
- `*`: all files and dirs, non-recursive
- `**`: all files and dirs, recursive
- `*/`: all dirs, non-recursive
- `**/`: all dirs, recursive
- All file (no dir) equivalents: either of the first two, with `no_dir=True`
'''
flags = wc_glob.GLOBSTAR | wc_glob.EXTGLOB | wc_glob.NEGATE
if no_dir:
flags |= wc_glob.NODIR
glob_list = list(map(
Path,
wc_glob.glob(_glob, root_dir=path, flags=flags)
))
if not relative:
glob_list = [Path(path, p) for p in glob_list]
return glob_list

View File

@ -1,5 +1,6 @@
from typing import TypeVar from abc import abstractmethod
from collections import namedtuple from collections import namedtuple
from typing import Protocol, TypeVar
from dataclasses import is_dataclass, asdict from dataclasses import is_dataclass, asdict
import sqlalchemy as sa import sqlalchemy as sa
@ -7,6 +8,12 @@ import sqlalchemy as sa
# custom types # custom types
SQLTableLike = TypeVar('SQLTableLike', bound=sa.Table | sa.Subquery | sa.Join) SQLTableLike = TypeVar('SQLTableLike', bound=sa.Table | sa.Subquery | sa.Join)
class Equatable(Protocol):
"""Protocol for annotating comparable types."""
@abstractmethod
def __eq__(self, other: 'Equatable') -> bool:
pass
# type checking/conversion methods # type checking/conversion methods
def is_dataclass_instance(obj) -> bool: def is_dataclass_instance(obj) -> bool:

View File

@ -7,5 +7,6 @@ sphinx-autodoc-typehints
sqlalchemy sqlalchemy
numpy numpy
wcmatch
pytest pytest