clean up general DB interfaces, make minor docstring revisions

This commit is contained in:
Sam G. 2024-04-30 20:13:19 -07:00
parent b3dc2a0876
commit b05fdda61a
10 changed files with 118 additions and 683 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@ __pycache__/
.pytest_cache/
localsys.egg-info/
.ipynb_checkpoints/
.pytest_cache/
.python-version
# vendor and build files

View File

@ -1,10 +1,9 @@
'''
Accessor
Provides access to an underlying schema through a supported set of operations. Class
methods could be general, high-level SQL wrappers, or convenience functions for common
schema-specific queries.
'''
import time
import inspect
from pathlib import Path
from collections import defaultdict
@ -23,7 +22,16 @@ class Accessor[C: Component](metaclass=ABCMeta):
Parameters:
engine: SQLAlchemy engine to use for queries. Engine is initialized dynamically as
a property (based on the config) if not provided
Instance variables:
access_log: time-indexed log of access queries performed
'''
def __init__(self):
self.access_log = {}
def log_access(self, stmt):
self.access_log[time.time()] = f'{stmt}'
@abstractmethod
def raw_select(
self,
@ -36,7 +44,7 @@ class Accessor[C: Component](metaclass=ABCMeta):
def select(
self,
connection,
component: C,
component: str | C,
*args,
**kwargs
):

View File

@ -2,25 +2,37 @@ import sqlalchemy as sa
from co3 import util
from co3.accessor import Accessor
from co3.accessors.sql import SQLAccessor
class FTSAccessor(Accessor):
def search(
'''
Perform queries on efficient full-text search (FTS) tables.
Note how this class doesn't inherit from ``SQLAccessor``, or even
``RelationalAccessor``. We don't look at FTS tables as Relations, due to restrictions
on their JOIN capabilities, and avoid embracing what composability they may have to
prevent accidentally introducing inefficiencies. Instead, just single FTS tables can
be selected from via this Accessor, and the FTSTable type, despite the use of the word
"table", is a direct child of the Component type.
'''
def __init__(self):
self.sql_accessor = SQLAccessor()
self.access_log = sql_accessor.access_log
def select(
self,
connection,
table_name : str,
select_cols : str | list | None = '*',
search_cols : str | None = None,
q : str | None = None,
colq : str | None = None,
snip_col : int | None = 0,
hl_col : int | None = 0,
limit : int | None = 100,
snip : int | None = 64,
tokenizer : str | None = 'unicode61',
group_by : str | None = None,
agg_cols : list | None = None,
wherein_dict: dict | None = None,
unique_on : dict | None = None,
select_cols : str | list | None = '*',
search_cols : str | None = None,
query : str | None = None,
col_query : str | None = None,
snip_col : int | None = 0,
hl_col : int | None = 0,
limit : int | None = 100,
snip : int | None = 64,
tokenizer : str | None = 'unicode61',
):
'''
Execute a search query against an indexed FTS table for specific primitives. This
@ -33,23 +45,6 @@ class FTSAccessor(Accessor):
``MATCH``-based score for the text & column queries. Results are (a list of) fully
expanded dictionaries housing column-value pairs.
Note:
GROUP BY cannot be paired with SQLITE FTS extensions; thus, we perform manual
group checks on the result set in Python before response
Analysis:
The returned JSON structure has been (loosely) optimized for speed on the client
side. Fully forming individual dictionary based responses saves time in
Javascript, as the JSON parser is expected to be able to create the objects
faster than post-hoc construction in JS. This return structure was compared
against returning an array of arrays (all ordered in the same fashion), along with
a column list to be associated with each of the result values. While this saves
some size on the payload (the same column names don't have to be transmitted for
each result), the size of the returned content massively outweighs the
predominantly short column names. The only way this structure would be viable is
if a significant amount was saved on transfer compared to the slow down in JS
object construction; this is (almost) never the case.
Parameters:
table_name : name of FTS table to search
search_cols : space separated string of columns to use for primary queries
@ -62,28 +57,25 @@ class FTSAccessor(Accessor):
limit : maximum number of results to return in the SQL query
snip : snippet length (max: 64)
tokenizer : tokenizer to use (assumes relevant FTS table has been built)
...
wherein_dict: (col-name, value-list) pairs to match result set against, via
WHERE ... IN clauses
Returns:
Dictionary with search results (list of column indexed dictionaries) and relevant
metadata.
'''
search_q = ''
search_query = ''
if type(select_cols) is list:
select_cols = ', '.join(select_cols)
# construct main search query
if search_cols and q:
search_q = f'{{{search_cols}}} : {q}'
if search_cols and query:
search_query = f'{{{search_cols}}} : {query}'
# add auxiliary search constraints
if colq:
search_q += f' {colq}'
if col_query:
search_query += f' {col_query}'
search_q = search_q.strip()
search_query = search_query.strip()
hl_start = '<b><mark>'
hl_end = '</mark></b>'
@ -99,8 +91,8 @@ class FTSAccessor(Accessor):
'''
where_clauses = []
if search_q:
where_clauses.append(f"{fts_table_name} MATCH '{search_q}'\n")
if search_query:
where_clauses.append(f"{fts_table_name} MATCH '{search_query}'\n")
if wherein_dict:
for col, vals in wherein_dict.items():
@ -112,36 +104,6 @@ class FTSAccessor(Accessor):
sql += f'ORDER BY rank LIMIT {limit};'
row_dicts, cols = self.raw_select(sql, include_cols=True)
row_dicts, cols = self.sql_accessor.raw_select(connection, sql, include_cols=True)
if group_by is None:
return row_dicts, cols
if agg_cols is None:
agg_cols = []
# "group by" block ID and wrangle the links into a list
# note we can't perform native GROUP BYs with FTS results
group_by_idx = {}
for row in row_dicts:
group_by_attr = row.get(group_by)
# add new entries
for agg_col in agg_cols:
row[f'{agg_col}_agg'] = set()
if group_by_attr is None:
continue
if group_by_attr not in group_by_idx:
group_by_idx[group_by_attr] = row
for agg_col in agg_cols:
if agg_col in row:
group_by_idx[group_by_attr][f'{agg_col}_agg'].add(row[agg_col])
return {
'results' : group_by_idx,
'columns' : cols,
'num_results' : len(row_dicts),
}
return row_dicts, cols

View File

@ -1,62 +1,58 @@
'''
Design proposal: variable backends
.. admonition:: Design proposal: variable backends
One particular feature not supported by the current type hierarchy is the possible use of
different backends to implement a general interface like SQLAccessor. One could imagine,
for instance, using ``sqlalchemy`` or ``sqlite`` to define the same methods laid out in a
parent class blueprint. It's not too difficult to imagine cases where both of these may be
useful, but for now it is outside the development scope. Should it ever enter the scope,
however, we might consider a simple ``backend`` argument on instantiation, keeping just the
SQLAccessor exposed rather than a whole set of backend-specific types:
.. code-block:: python
class SQLAlchemyAccessor(RelationalAccessor): # may also inherit from a dedicated interface parent
def select(...):
...
One particular feature not supported by the current type hierarchy is the possible use of
different backends to implement a general interface like SQLAccessor. One could imagine,
for instance, using ``sqlalchemy`` or ``sqlite`` to define the same methods laid out in a
parent class blueprint. It's not too difficult to imagine cases where both of these may be
useful, but for now it is outside the development scope. Should it ever enter the scope,
however, we might consider a simple ``backend`` argument on instantiation, keeping just the
SQLAccessor exposed rather than a whole set of backend-specific types:
class SQLiteAccessor(RelationalAccessor):
def select(...):
...
.. code-block:: python
class SQLAccessor(RelationalAccessor):
backends = {
'sqlalchemy': SQLAlchemyAccessor,
'sqlite': SQLteAccessor,
}
class SQLAlchemyAccessor(RelationalAccessor): # may also inherit from a dedicated interface parent
def select(...):
...
class SQLiteAccessor(RelationalAccessor):
def select(...):
...
class SQLAccessor(RelationalAccessor):
backends = {
'sqlalchemy': SQLAlchemyAccessor,
'sqlite': SQLteAccessor,
}
def __init__(self, backend: str):
self.backend = self.backends.get(backend)
def select(...):
return self.backend.select(...)
def __init__(self, backend: str):
self.backend = self.backends.get(backend)
def select(...):
return self.backend.select(...)
For now, we can look at SQLAccessor (and equivalents in other type hierarchies, like
SQLManagers) as being SQLAlchemyAccessors and not supporting any backend swapping. But in
theory, to make the above change, we'd just need to rename it and wrap it up.
For now, we can look at SQLAccessor (and equivalents in other type hierarchies, like
SQLManagers) as being SQLAlchemyAccessors and not supporting any backend swapping. But in
theory, to make the above change, we'd just need to rename it and wrap it up.
'''
from pathlib import Path
from collections.abc import Iterable
import inspect
from functools import cache
from pathlib import Path
import sqlalchemy as sa
from co3 import util
from co3.engines import SQLEngine
from co3.accessor import Accessor
from co3.components import Relation, SQLTable
class RelationalAccessor[R: Relation](Accessor[R]):
@staticmethod
def raw_select(
self,
connection,
text: str
):
#connection.exec
raise NotImplementedError
def select(
@ -110,6 +106,7 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
bind_params=bind_params,
include_cols=include_cols
)
self.log_access(sql)
if mappings:
return res.mappings().all()
@ -165,6 +162,7 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
statement = statement.limit(limit)
res = SQLEngine.execute(connection, statement, include_cols=include_cols)
self.log_access(statement)
if mappings:
return res.mappings().all()
@ -181,11 +179,15 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
returned dictionaries. This information is not available under CursorResults and thus
must be provided separately. This will yield results like the following:
[..., {'table1.col':<value>, 'table2.col':<value>, ...}, ...]
.. code-block:: python
[..., {'table1.col':<value>, 'table2.col':<value>, ...}, ...]
Instead of the automatic mapping names:
[..., {'col':<value>, 'col_1':<value>, ...}, ...]
.. code-block:: python
[..., {'col':<value>, 'col_1':<value>, ...}, ...]
which can make accessing certain results a little more intuitive.
'''

View File

@ -1,10 +1,9 @@
import time
import pickle
import logging
from pathlib import Path
import time
import sqlalchemy as sa
#from sentence_transformers import SentenceTransformer, util
from co3.accessor import Accessor
@ -55,10 +54,11 @@ class VSSAccessor(Accessor):
normalize_embeddings = True
)
def search(
def select(
self,
query : str,
connection,
index_name : str,
query : str,
limit : int = 10,
score_threshold = 0.5,
):

View File

@ -111,6 +111,9 @@ class SQLTable(Relation[SQLTableLike]):
self.obj.join(_with.obj, on, isouter=outer)
)
class FTSTable(Relation[SQLTableLike]):
pass
# key-value stores
class Dictionary(Relation[dict]):
def get_attributes(self):

View File

@ -1,12 +1,11 @@
from collections import defaultdict
import logging
import time
import logging
from collections import defaultdict
from tqdm import tqdm
import sqlalchemy as sa
from co3 import util
from co3.manager import Manager
from co3.accessors.sql import SQLAccessor
@ -14,154 +13,24 @@ from co3.accessors.sql import SQLAccessor
logger = logging.getLogger(__name__)
class FTSManager(Manager):
def __init__(self, database, engine=None):
super().__init__(database, engine)
self.accessor = SQLAccessor(engine)
self.composer = CoreComposer()
def recreate_simple(self):
select_cols = [
tables.files.c.rpath, tables.files.c.ftype,
tables.notes.c.title, tables.notes.c.link,
tables.note_conversion_matter.c.type,
tables.blocks.c.name, tables.blocks.c.header,
tables.blocks.c.elem_pos,
tables.block_conversion_matter.c.content,
tables.links.c.target,
#tables.links.c.target.label('link_target'),
]
def __init__(self):
self.sql_accessor = SQLAccessor()
def recreate_from_table(self, table: str, cols):
inserts, res_cols = self.accessor.select(
'leftward_link_conversions',
cols=select_cols,
table,
cols=cols,
include_cols=True,
)
util.db.populate_fts5(
self.engine,
['search'],
#columns=select_cols,
columns=res_cols,
inserts=inserts,
)
def recreate_pivot(self):
block_cols = [
tables.files.c.rpath, tables.files.c.ftype,
tables.notes.c.title, tables.notes.c.link,
tables.note_conversion_matter.c.type,
tables.blocks.c.name, tables.blocks.c.header,
tables.blocks.c.elem_pos,
#tables.block_conversion_matter.c.content,
#tables.links.c.target,
#tables.links.c.target.label('link_target'),
]
lcp = self.accessor.link_content_pivot(cols=block_cols)
start = time.time()
inserts, cols = self.accessor.select(
lcp,
#cols=select_cols
include_cols=True,
)
logger.info(f'FTS recreate: pre-index SELECT took {time.time()-start:.2f}s')
return inserts
#util.db.populate_fts5(
# self.engine,
# ['search'],
# #columns=select_cols,
# columns=res_cols,
# inserts=inserts,
#)
def recreate(self):
select_cols = [
tables.links.c.id,
tables.files.c.rpath, tables.files.c.ftype,
tables.files.c.title, tables.files.c.link,
tables.notes.c.type,
tables.elements.c.name, tables.blocks.c.header,
tables.elements.c.elem_pos,
tables.element_conversions.c.format,
tables.element_conversions.c.content,
tables.links.c.target,
tables.links.c.type.label('link_type'),
#tables.links.c.target.label('link_target'),
]
start = time.time()
fts_basis = self.composer.get_table('leftward_link_conversions')
inserts = self.accessor.select(
fts_basis,
cols=select_cols,
)
logger.info(f'FTS recreate: pre-index SELECT took {time.time()-start:.2f}s')
start = time.time()
flattened_blocks = defaultdict(lambda:defaultdict(lambda:None))
other_results = []
# flatten conversions across supported formats
# i.e. put "src" and "html" into columns within a single row
for row in inserts:
name = row['name']
# remove pure identifier rows
link_id = row.pop('id')
_format = row.pop('format')
content = row.pop('content')
# add new derived rows
row['src'] = None
row['html'] = None
if name is None:
other_results.append(row)
continue
if flattened_blocks[name][link_id] is None:
flattened_blocks[name][link_id] = row
nrow = flattened_blocks[name][link_id]
if _format == 'src':
nrow['src'] = content
elif _format == 'html5':
nrow['html'] = content
inserts = other_results
for name, link_dict in flattened_blocks.items():
for link_id, row in link_dict.items():
inserts.append(row)
# should have at least one result to index
assert len(inserts) > 0
cols = inserts[0].keys()
# all keys should match
assert all([cols == d.keys() for d in inserts])
logger.info(f'FTS recreate: insert post-processing took {time.time()-start:.2f}s')
util.db.populate_fts5(
@ -177,40 +46,3 @@ class FTSManager(Manager):
def migrate(self): pass
def recreate_old(self):
full_table = schema.leftward_block_conversions()
primitive_tables = schema.primitive_tables()
virtual_inserts = []
for ptype, table in primitive_tables.items():
table_rows = db.util.sa_exec_dicts(self.engine, sa.select(table))
for row in table_rows:
pdict = util.db.prepare_insert(full_table, row)
pdict['ptype'] = ptype
virtual_inserts.append(pdict)
select_cols = [
tables.blocks.c.name, tables.blocks.c.content,
tables.blocks.c.html, tables.blocks.c.header,
tables.blocks.c.block_pos, tables.blocks.c.note_name,
tables.links.c.to_file,
tables.notes.c.link,
tables.note_conversion_matter.c.type,
tables.note_conversion_matter.c.summary,
'ptype',
]
util.db.populate_fts5(
self.engine,
'search',
columns=select_cols,
inserts=virtual_inserts,
)

View File

@ -1,5 +1,6 @@
'''
Note: Common on insert behavior
.. admonition:: Common on insert behavior
- Tables with unique constraints have been equipped with ``sqlite_on_conflict_unique``
flags, enabling conflicting bulk inserts to replace conflicting rows gracefully. No
need to worry about explicitly handling upserts.
@ -9,7 +10,8 @@ Note: Common on insert behavior
bulk calls AND allow update/replacements on conflict; the setting above allows this
bulk usage to continue as is.
Note: Options for insert/update model
.. admonition:: Options for insert/update model
1. Collect all possible update objects, and use a SQLite bulk insert that only updates
if needed (based on modified time, for example). This sounds on the surface like
the right approach since we defer as much to bulk SQL logic, but it's far and away
@ -50,8 +52,6 @@ from co3.engines import SQLEngine
from co3.manager import Manager
from co3.components import Relation, SQLTable
#from localsys.reloader.router._base import ChainRouter, Event
logger = logging.getLogger(__name__)
@ -76,24 +76,21 @@ class SQLManager(RelationalManager[SQLTable]):
def __init__(self, *args, **kwargs):
'''
The insert lock is a _reentrant lock_, meaning the same thread can acquire the
lock again with out deadlocking (simplifying across methods of this class that
lock again without deadlocking (simplifying across methods of this class that
need it).
'''
super().__init__(*args, **kwargs)
self.routers = []
self._router = None
self._insert_lock = threading.RLock()
@property
def router(self):
if self._router is None:
self._router = ChainRouter(self.routers)
return self._router
def update(self):
pass
def add_router(self, router):
self.routers.append(router)
def migrate(self):
pass
def sync(self):
pass
def recreate(
self,
@ -112,8 +109,6 @@ class SQLManager(RelationalManager[SQLTable]):
metadata.drop_all(engine.manager)
metadata.create_all(engine.manager, checkfirst=True)
def update(self): pass
def insert(
self,
connection,
@ -167,246 +162,3 @@ class SQLManager(RelationalManager[SQLTable]):
return res_list
def _file_sync_bools(self):
synced_bools = []
fpaths = utils.paths.iter_nested_paths(self.collector.basepath, no_dir=True)
db_rpath_index = self.database.files.group_by('rpath')()
for fpath in fpaths:
file = File(fpath, self.collector.basepath)
db_file = db_rpath_index.get(file.rpath)
file_in_sync = db_file and float(db_file.get('mtime','0')) >= file.mtime
synced_bools.append(file_in_sync)
return fpaths, synced_bools
def sync(
self,
limit=0,
chunk_time=0,
dry=False,
chunk_size_cap=1000,
print_report=True,
):
'''
Note:
Could be dangerous if going from fast file processing to note processing.
Imagine estimating 1000 iter/sec, then transferring that to the next batch
when it's more like 0.2 iter/sec. We would lose any chunking. (Luckily, in
practice, turns out that notes are almost always processed before the huge set
of nested files lurking and we don't experience this issue.)
'''
#filepaths, update_bools = self.collector.file_inserts(dry_run=True)
#fpaths_to_update = [f for (f, b) in zip(filepaths, update_bools) if not b]
filepaths, update_bools = self._file_sync_bools()
fpaths_to_update = [f for (f, b) in zip(filepaths, update_bools) if not b]
def calc_chunk_size(sec_per_file):
'''
Chunk cap only applied here; if manually set elsewhere in the method, it's
intentional e.g., if chunk_time <= 0 and we just want one big iteration.
'''
return min(max(int(chunk_time / sec_per_file), 1), chunk_size_cap)
# nothing to do
if not fpaths_to_update:
logger.info('Sync has nothing to do, exiting')
return None
ood_count = len(fpaths_to_update)
total_fps = len(filepaths)
ood_prcnt = ood_count/total_fps*100
logger.info(
f'Pre-sync scan yielded {ood_count}/{total_fps} ({ood_prcnt:.2f}%) out-of-date files'
)
if limit <= 0: limit = ood_count
if chunk_time <= 0:
chunk_size = ood_count
else:
chunk_size = calc_chunk_size(5)
dry_inserts = None
remaining = limit
chunk_time_used = 0
pbar = tqdm(
desc=f'Adaptive chunked sync for [limit {limit}]',
total=remaining,
)
report = []
def collect_report():
report.append({
'chunk_size': chunk_limit,
'chunk_time_used': chunk_time_used,
})
with pbar as _:
while remaining > 0:
time_pcnt = chunk_time_used / max(chunk_time,1) * 100
pbar.set_description(
f'Adaptive chunked sync [size {chunk_size} (max {chunk_size_cap})] '
f'[prev chunk {chunk_time_used:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]'
)
chunk_limit = min(remaining, chunk_size)
start_idx = limit - remaining
chunk_fpaths = fpaths_to_update[start_idx:start_idx+chunk_limit]
chunk_time_start = time.time()
#inserts = self.collector.collect_inserts(
# chunk_fpaths,
# skip_updated=False, # saves on DB check given provided list
#)
# 1) flush synthetic events for the batch through the chained router
# 2) block until completed and sweep up the collected inserts
futures = self.router.submit([
Event(
endpoint=str(self.collector.basepath),
name=str(path.relative_to(self.collector.basepath)),
action=[0xffffffff], # synthetic action to match any flag filters
) for path in chunk_fpaths
])
for future in tqdm(
as_completed(futures),
total=chunk_size,
desc=f'Awaiting chunk futures [submitted {len(futures)}/{chunk_size}]'
):
#print('Finished future', cfuture)
try:
result = future.result()
except Exception as e:
logger.warning(f"Sync job failed with exception {e}")
#wait(futures)#, timeout=10)
inserts = self.collector.collect_inserts()
chunk_time_used = time.time() - chunk_time_start
collect_report()
#if not inserts: break
if dry:
if dry_inserts is None:
dry_inserts = inserts
else:
for ik, iv in inserts.items():
dry_inserts[ik].extend(iv)
else:
self.insert(inserts)
remaining -= chunk_limit
# re-calculate chunk size
sec_per_file = chunk_time_used / chunk_size
chunk_size = calc_chunk_size(sec_per_file)
#pbar.update(n=limit-remaining)
pbar.update(n=chunk_limit)
if print_report:
num_chunks = len(report)
total_ins = sum(map(lambda c:c['chunk_size'], report))
avg_chunk = total_ins / num_chunks
total_time = sum(map(lambda c:c['chunk_time_used'], report))
avg_time = total_time / num_chunks
time_match = avg_time / max(chunk_time*100, 1)
print( 'Sync report: ')
print(f' Total chunks : {num_chunks} ')
print(f' Total file inserts : {total_ins} / {limit} ')
print(f' Average chunk size : {avg_chunk:.2f} ')
print(f' Total time spent : {total_time:.2f}s ')
print(f' Average time / chunk : {avg_time:.2f}s / {chunk_time}s')
print(f' Percent time match : {time_match:.2f}% ')
if dry: return dry_inserts
def migrate(self):
# create MD object representation current DB state
pre_metadata = sa.MetaData()
pre_metadata.reflect(bind=self.engine)
table_rows = {}
for table_name, table in pre_metadata.tables.items():
# pre-migration rows
select_results = utils.db.sa_execute(self.engine, sa.select(table))
named_results = utils.db.named_results(table, select_results)
table_rows[table_name] = named_results
#table_update_map = {
# 'files': (
# lambda r: File(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
# ),
# 'notes': (
# lambda r: Note(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
# ),
# 'note_conversion_matter': (
# lambda r: Note(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
# ),
#}
# update schema
self.recreate()
with self.engine.connect() as connection:
for table_name, table in tables.metadata.tables.items():
if table_name not in table_rows: continue
logger.info(f'Migrating table "{table_name}"')
if table_name == 'files':
update_note_rows = []
for i,note_row in enumerate(table_rows['files']):
file = File(Path(NOTEPATH, note_row['name']), NOTEPATH)
u_note_row = utils.db.prepare_insert(
tables.files,
file.get_table_data()
)
note_row.update({k:v for k,v in u_note_row.items() if v})
update_note_rows.append(note_row)
table_rows['files'] = update_note_rows
elif table_name == 'notes':
update_note_rows = []
for note_row in table_rows['notes']:
note = Note(Path(NOTEPATH, note_row['name']), NOTEPATH)
u_note_row = utils.db.prepare_insert(
tables.notes,
note.get_table_data()
)
note_row.update({k:v for k,v in u_note_row.items() if v})
update_note_rows.append(note_row)
table_rows['notes'] = update_note_rows
# generic update blueprint for later; now doesn't matter as conversion based
# inserts require re-converting...so you may as well just do a full re-build.
#update_rows = []
#if table_name in table_update_map:
# for i,row in enumerate(table_rows['files']):
# update_obj = table_update_map[table_name](row)
# u_row = utils.db.prepare_insert(
# table,
# note.get_table_data()
# )
# note_row.update({k:v for k,v in u_note_row.items() if v})
# update_note_rows.append(note_row)
connection.execute(
sa.insert(table),
table_rows[table_name]
)
connection.commit()

View File

@ -1,142 +1,16 @@
from collections import defaultdict
import logging
import pickle
import time
from pathlib import Path
import numpy as np
from tqdm import tqdm
import sqlalchemy as sa
from co3.manager import Manager
from co3 import util
logger = logging.getLogger(__name__)
class VSSManager(Manager):
def __init__(self, database, engine=None):
super().__init__(database, engine)
def recreate(self): pass
from localsys.db.databases.core import CoreDatabase
self.core_db = CoreDatabase()
def recreate(self):
start = time.time()
chunks = []
chunk_ids = []
block_map = {}
for note_name, note_groupby in self.core_db.blocks().items():
blocks = note_groupby.get('aggregates', [])
for block in blocks:
if block.get('format') != 'src': continue
block_name = block.get('name', '')
block_content = block.get('content', '')
block_map[block_name] = block_content
block_chunks, chunk_ranges = utils.embed.split_block(block_content)
chunks.extend(block_chunks)
chunk_ids.extend([f'{block_name}@{r[0]}:{r[1]}' for r in chunk_ranges])
logger.info(f'VSS pre-index SELECT took {time.time()-start:.2f}s')
start = time.time()
# one-time batched embedding
chunk_embeddings = self.database.access.embed_chunks(chunks)
logger.info(f'VSS embeddings took {time.time()-start:.2f}s')
start = time.time()
# aggregate embeddings up the content hierarchy
zero_embed = lambda: np.atleast_2d(
np.zeros(self.database.access._embedding_size, dtype='float32')
)
block_embedding_map = defaultdict(zero_embed)
note_embedding_map = defaultdict(zero_embed)
for i, chunk_id in enumerate(chunk_ids):
at_split = chunk_id.split('@')
note_name = at_split[0]
block_name = '@'.join(at_split[:3])
block_embedding_map[block_name] += chunk_embeddings[i]
note_embedding_map[note_name] += chunk_embeddings[i]
# re-normalize aggregated embeddings
# > little inefficient but gobbled up asymptotically by chunk processing
# > could o/w first convert each map to a full 2D np array, then back
for embed_map in [block_embedding_map, note_embedding_map]:
for k, agg_embedding in embed_map.items():
embed_map[k] = utils.embed.unit_row_wise(agg_embedding)
logger.info(f'VSS recreate: pre-index SELECT took {time.time()-start:.2f}s')
# cols = ['rowid', 'chunk_embedding']
# def pack_inserts(embeddings):
# return [
# { cols[0]: i, cols[1]: embedding.tobytes() }
# for i, embedding in enumerate(embeddings)
# ]
# chunk_inserts = pack_inserts(chunk_embeddings)
# block_inserts = pack_inserts(block_embedding_map.values())
# note_inserts = pack_inserts(note_embedding_map.values())
chunk_ids = chunk_ids
block_ids = list(block_embedding_map.keys())
note_ids = list(note_embedding_map.keys())
block_embeddings = np.vstack(list(block_embedding_map.values()))
note_embeddings = np.vstack(list(note_embedding_map.values()))
blocks = [block_map[b] for b in block_ids]
notes = note_ids
self.database.access.write_embeddings({
'chunks': (chunk_ids, chunk_embeddings, chunks),
'blocks': (block_ids, block_embeddings, blocks),
'notes' : (note_ids, note_embeddings, notes),
})
logger.info(f'Post-embedding parsing took {time.time()-start:.2f}s')
# utils.db.create_vss0(
# self.engine,
# 'chunks',
# columns=list(cols),
# inserts=chunk_inserts,
# populate=True,
# reset=True,
# embedding_size=self._embedding_size,
# )
def update(self): pass
def insert(self): pass
def sync(self): pass
def migrate(self): pass
def _archive_note_embed(self):
for block in blocks:
content = block.get('content')
if not content: continue
block_chunks = utils.embed.split_block(content)
chunks.extend(block_chunks)
embeddings = utils.embed.embed_chunks(chunks)
block_embeddings = utils.embed.map_groups(embeddings, embed_block_map)
self.convert_map['vect'] = block_embeddings
note_embedding = utils.embed.unit_row_wise(
embeddings.sum(axis=0).reshape(1, -1)
)
return {
'content': note_embedding.tobytes(),
}

View File

@ -59,6 +59,7 @@ class Mapper[C: Component]:
inserts (hence why we tie Mappers to Schemas one-to-one).
.. admonition:: Dev note
the Composer needs reconsideration, or at least its positioning directly in this
class. It may be more appropriate to have at the Schema level, or even just
dissolved altogether if arbitrary named Components can be attached to schemas.