From 1bc7752b06293612a49356dd452c679ff629edbc Mon Sep 17 00:00:00 2001 From: "Sam G." Date: Sun, 12 May 2024 00:46:49 -0700 Subject: [PATCH] add syncer shutdown control and clean up logging --- co3/resources/disk.py | 2 +- co3/syncer.py | 60 +++++++++++++++++++++++++++++-------------- co3/util/__init__.py | 1 + co3/util/generic.py | 9 +++++++ 4 files changed, 52 insertions(+), 20 deletions(-) create mode 100644 co3/util/generic.py diff --git a/co3/resources/disk.py b/co3/resources/disk.py index 556b46e..6fc1f15 100644 --- a/co3/resources/disk.py +++ b/co3/resources/disk.py @@ -24,7 +24,7 @@ class DiskResource(SelectableResource): else: path_union = set(paths.iter_glob_paths(glob, path, **iter_path_kwargs)) - path_agg.union(( (path, head) for head in path_union )) + path_agg = path_agg.union(( (path, head) for head in path_union )) return path_agg diff --git a/co3/syncer.py b/co3/syncer.py index 9f9afe4..0ab0524 100644 --- a/co3/syncer.py +++ b/co3/syncer.py @@ -1,9 +1,14 @@ +import time import random import logging from collections import defaultdict +from tqdm import tqdm +from colorama import Fore, Back, Style + from co3.differ import Differ from co3.util.types import Equatable +from co3.util.generic import text_mod logger = logging.getLogger(__name__) @@ -12,6 +17,8 @@ class Syncer[E: Equatable]: def __init__(self, differ: Differ[E]): self.differ = differ + self.should_exit = False + def handle_l_excl(self, key: E, val: list): return key @@ -27,6 +34,9 @@ class Syncer[E: Equatable]: def process_chunk(self, handler_results): return handler_results + def shutdown(self): + self.should_exit = True + def _handle_chunk_items(self, membership_items): results = [] for membership, item in membership_items: @@ -163,10 +173,11 @@ class Syncer[E: Equatable]: # if chunk time not set, set to the largest value: the chunk cap if chunk_time is None: + chunk_time = 0 chunk_size = chunk_cap else: # otherwise, assume 1s per item up to the cap size - chunk_size = min(chunk_time, chunk_cap) + chunk_size = max(min(chunk_time, chunk_cap), 1) # chunk loop variables chunk_timer = 0 @@ -181,20 +192,20 @@ class Syncer[E: Equatable]: ) with pbar as _: - while remaining > 0: - time_pcnt = chunk_time_used / max(chunk_time, 1) * 100 + while remaining > 0 and not self.should_exit: + time_pcnt = chunk_timer/chunk_time*100 if chunk_time else 100 pbar.set_description( f'Adaptive chunked sync [size {chunk_size} (max {chunk_cap})] ' - f'[prev chunk {chunk_time_used:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]' + f'[prev chunk {chunk_timer:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]' ) # time the handler & processing sequence chunk_time_start = time.time() - start_idx = limit - remaining + start_idx = item_limit - remaining chunk_items = items[start_idx:start_idx+chunk_size] handler_results = self._handle_chunk_items(chunk_items) - results.extend(self.process_chunk(handler_results)) + chunk_results.extend(self.process_chunk(handler_results)) chunk_timer = time.time() - chunk_time_start @@ -209,25 +220,36 @@ class Syncer[E: Equatable]: # re-calculate the chunk size with a simple EMA s_per_item = chunk_timer / chunk_size - new_target = int(chunk_time / s_per_item) - chunk_size = 0.5*new_target + 0.5*chunk_size + new_target = chunk_time / s_per_item + chunk_size = int(0.5*new_target + 0.5*chunk_size) # apply the chunk cap and clip by remaining items chunk_size = min(min(chunk_size, chunk_cap), remaining) # ensure chunk size is >= 1 to prevent loop stalling chunk_size = max(chunk_size, 1) - - avg_chunk_size = chunk_report['size'] / chunk_report['count'] - avg_chunk_time = chunk_report['time'] / chunk_report['count'] - avg_time_match = avg_chunk_time / chunk_5ime - logger.info(f'Sync report: ') - logger.info(f'-> Total chunks : {chunk_report["count"]} ') - logger.info(f'-> Total items processed : {chunk_report["size"]} / {limit} ') - logger.info(f'-> Total time spent : {chunk_report["timer"]:.2f}s ') - logger.info(f'-> Average chunk size : {avg_chunk_size:.2f} ') - logger.info(f'-> Average time/chunk : {avg_chunk_time:.2f}s / {chunk_time}s') - logger.info(f'-> Average time match : {avg_time_match:.2f}% ') + if self.should_exit: + logger.info(text_mod('Syncer received interrupt, sync loop exiting early', Fore.BLACK, Back.RED)) + + avg_chunk_size = chunk_report['size'] / max(chunk_report['count'], 1) + avg_chunk_time = chunk_report['timer'] / max(chunk_report['count'], 1) + avg_time_match = avg_chunk_time / chunk_time if chunk_time else 1 + + report_text = [ + f'-> Total chunks : {chunk_report["count"]} ', + f'-> Total items processed : {chunk_report["size"]} / {item_limit}', + f'-> Total time spent : {chunk_report["timer"]:.2f}s ', + f'-> Average chunk size : {avg_chunk_size:.2f} ', + f'-> Average time/chunk : {avg_chunk_time:.2f}s / {chunk_time}s', + f'-> Average time match : {avg_time_match*100:.2f}% ', + ] + + pad = 50 + color_args = [] + + logger.info(text_mod('Sync report', Style.BRIGHT, Fore.WHITE, Back.BLUE, pad=pad)) + for line in report_text: + logger.info(text_mod(line, *color_args, pad=pad)) return chunk_results diff --git a/co3/util/__init__.py b/co3/util/__init__.py index 4edd437..8769816 100644 --- a/co3/util/__init__.py +++ b/co3/util/__init__.py @@ -2,3 +2,4 @@ from co3.util import db from co3.util import regex from co3.util import types from co3.util import paths +from co3.util import generic diff --git a/co3/util/generic.py b/co3/util/generic.py new file mode 100644 index 0000000..922cb2f --- /dev/null +++ b/co3/util/generic.py @@ -0,0 +1,9 @@ +from colorama import Fore, Back, Style + + +def color_text(text, *colorama_args): + return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}" + +def text_mod(text, *colorama_args, pad=0): + return color_text(text.ljust(pad), *colorama_args) +