add syncer shutdown control and clean up logging
This commit is contained in:
parent
ee6c1ebec5
commit
1bc7752b06
@ -24,7 +24,7 @@ class DiskResource(SelectableResource):
|
||||
else:
|
||||
path_union = set(paths.iter_glob_paths(glob, path, **iter_path_kwargs))
|
||||
|
||||
path_agg.union(( (path, head) for head in path_union ))
|
||||
path_agg = path_agg.union(( (path, head) for head in path_union ))
|
||||
|
||||
return path_agg
|
||||
|
||||
|
@ -1,9 +1,14 @@
|
||||
import time
|
||||
import random
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
|
||||
from tqdm import tqdm
|
||||
from colorama import Fore, Back, Style
|
||||
|
||||
from co3.differ import Differ
|
||||
from co3.util.types import Equatable
|
||||
from co3.util.generic import text_mod
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -12,6 +17,8 @@ class Syncer[E: Equatable]:
|
||||
def __init__(self, differ: Differ[E]):
|
||||
self.differ = differ
|
||||
|
||||
self.should_exit = False
|
||||
|
||||
def handle_l_excl(self, key: E, val: list):
|
||||
return key
|
||||
|
||||
@ -27,6 +34,9 @@ class Syncer[E: Equatable]:
|
||||
def process_chunk(self, handler_results):
|
||||
return handler_results
|
||||
|
||||
def shutdown(self):
|
||||
self.should_exit = True
|
||||
|
||||
def _handle_chunk_items(self, membership_items):
|
||||
results = []
|
||||
for membership, item in membership_items:
|
||||
@ -163,10 +173,11 @@ class Syncer[E: Equatable]:
|
||||
|
||||
# if chunk time not set, set to the largest value: the chunk cap
|
||||
if chunk_time is None:
|
||||
chunk_time = 0
|
||||
chunk_size = chunk_cap
|
||||
else:
|
||||
# otherwise, assume 1s per item up to the cap size
|
||||
chunk_size = min(chunk_time, chunk_cap)
|
||||
chunk_size = max(min(chunk_time, chunk_cap), 1)
|
||||
|
||||
# chunk loop variables
|
||||
chunk_timer = 0
|
||||
@ -181,20 +192,20 @@ class Syncer[E: Equatable]:
|
||||
)
|
||||
|
||||
with pbar as _:
|
||||
while remaining > 0:
|
||||
time_pcnt = chunk_time_used / max(chunk_time, 1) * 100
|
||||
while remaining > 0 and not self.should_exit:
|
||||
time_pcnt = chunk_timer/chunk_time*100 if chunk_time else 100
|
||||
pbar.set_description(
|
||||
f'Adaptive chunked sync [size {chunk_size} (max {chunk_cap})] '
|
||||
f'[prev chunk {chunk_time_used:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]'
|
||||
f'[prev chunk {chunk_timer:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]'
|
||||
)
|
||||
|
||||
# time the handler & processing sequence
|
||||
chunk_time_start = time.time()
|
||||
|
||||
start_idx = limit - remaining
|
||||
start_idx = item_limit - remaining
|
||||
chunk_items = items[start_idx:start_idx+chunk_size]
|
||||
handler_results = self._handle_chunk_items(chunk_items)
|
||||
results.extend(self.process_chunk(handler_results))
|
||||
chunk_results.extend(self.process_chunk(handler_results))
|
||||
|
||||
chunk_timer = time.time() - chunk_time_start
|
||||
|
||||
@ -209,25 +220,36 @@ class Syncer[E: Equatable]:
|
||||
|
||||
# re-calculate the chunk size with a simple EMA
|
||||
s_per_item = chunk_timer / chunk_size
|
||||
new_target = int(chunk_time / s_per_item)
|
||||
chunk_size = 0.5*new_target + 0.5*chunk_size
|
||||
new_target = chunk_time / s_per_item
|
||||
chunk_size = int(0.5*new_target + 0.5*chunk_size)
|
||||
|
||||
# apply the chunk cap and clip by remaining items
|
||||
chunk_size = min(min(chunk_size, chunk_cap), remaining)
|
||||
|
||||
# ensure chunk size is >= 1 to prevent loop stalling
|
||||
chunk_size = max(chunk_size, 1)
|
||||
|
||||
avg_chunk_size = chunk_report['size'] / chunk_report['count']
|
||||
avg_chunk_time = chunk_report['time'] / chunk_report['count']
|
||||
avg_time_match = avg_chunk_time / chunk_5ime
|
||||
|
||||
logger.info(f'Sync report: ')
|
||||
logger.info(f'-> Total chunks : {chunk_report["count"]} ')
|
||||
logger.info(f'-> Total items processed : {chunk_report["size"]} / {limit} ')
|
||||
logger.info(f'-> Total time spent : {chunk_report["timer"]:.2f}s ')
|
||||
logger.info(f'-> Average chunk size : {avg_chunk_size:.2f} ')
|
||||
logger.info(f'-> Average time/chunk : {avg_chunk_time:.2f}s / {chunk_time}s')
|
||||
logger.info(f'-> Average time match : {avg_time_match:.2f}% ')
|
||||
if self.should_exit:
|
||||
logger.info(text_mod('Syncer received interrupt, sync loop exiting early', Fore.BLACK, Back.RED))
|
||||
|
||||
avg_chunk_size = chunk_report['size'] / max(chunk_report['count'], 1)
|
||||
avg_chunk_time = chunk_report['timer'] / max(chunk_report['count'], 1)
|
||||
avg_time_match = avg_chunk_time / chunk_time if chunk_time else 1
|
||||
|
||||
report_text = [
|
||||
f'-> Total chunks : {chunk_report["count"]} ',
|
||||
f'-> Total items processed : {chunk_report["size"]} / {item_limit}',
|
||||
f'-> Total time spent : {chunk_report["timer"]:.2f}s ',
|
||||
f'-> Average chunk size : {avg_chunk_size:.2f} ',
|
||||
f'-> Average time/chunk : {avg_chunk_time:.2f}s / {chunk_time}s',
|
||||
f'-> Average time match : {avg_time_match*100:.2f}% ',
|
||||
]
|
||||
|
||||
pad = 50
|
||||
color_args = []
|
||||
|
||||
logger.info(text_mod('Sync report', Style.BRIGHT, Fore.WHITE, Back.BLUE, pad=pad))
|
||||
for line in report_text:
|
||||
logger.info(text_mod(line, *color_args, pad=pad))
|
||||
|
||||
return chunk_results
|
||||
|
@ -2,3 +2,4 @@ from co3.util import db
|
||||
from co3.util import regex
|
||||
from co3.util import types
|
||||
from co3.util import paths
|
||||
from co3.util import generic
|
||||
|
9
co3/util/generic.py
Normal file
9
co3/util/generic.py
Normal file
@ -0,0 +1,9 @@
|
||||
from colorama import Fore, Back, Style
|
||||
|
||||
|
||||
def color_text(text, *colorama_args):
|
||||
return f"{''.join(colorama_args)}{text}{Style.RESET_ALL}"
|
||||
|
||||
def text_mod(text, *colorama_args, pad=0):
|
||||
return color_text(text.ljust(pad), *colorama_args)
|
||||
|
Loading…
Reference in New Issue
Block a user