clean up general DB interfaces, make minor docstring revisions
This commit is contained in:
parent
dffc538fba
commit
06eb7b1047
1
.gitignore
vendored
1
.gitignore
vendored
@ -3,6 +3,7 @@ __pycache__/
|
||||
.pytest_cache/
|
||||
localsys.egg-info/
|
||||
.ipynb_checkpoints/
|
||||
.pytest_cache/
|
||||
.python-version
|
||||
|
||||
# vendor and build files
|
||||
|
@ -37,3 +37,8 @@ co3/util/__init__.py
|
||||
co3/util/db.py
|
||||
co3/util/regex.py
|
||||
co3/util/types.py
|
||||
tests/test_co3.py
|
||||
tests/test_database.py
|
||||
tests/test_imports.py
|
||||
tests/test_mapper.py
|
||||
tests/test_schema.py
|
@ -1,10 +1,9 @@
|
||||
'''
|
||||
Accessor
|
||||
|
||||
Provides access to an underlying schema through a supported set of operations. Class
|
||||
methods could be general, high-level SQL wrappers, or convenience functions for common
|
||||
schema-specific queries.
|
||||
'''
|
||||
import time
|
||||
import inspect
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
@ -23,7 +22,16 @@ class Accessor[C: Component](metaclass=ABCMeta):
|
||||
Parameters:
|
||||
engine: SQLAlchemy engine to use for queries. Engine is initialized dynamically as
|
||||
a property (based on the config) if not provided
|
||||
|
||||
Instance variables:
|
||||
access_log: time-indexed log of access queries performed
|
||||
'''
|
||||
def __init__(self):
|
||||
self.access_log = {}
|
||||
|
||||
def log_access(self, stmt):
|
||||
self.access_log[time.time()] = f'{stmt}'
|
||||
|
||||
@abstractmethod
|
||||
def raw_select(
|
||||
self,
|
||||
@ -36,7 +44,7 @@ class Accessor[C: Component](metaclass=ABCMeta):
|
||||
def select(
|
||||
self,
|
||||
connection,
|
||||
component: C,
|
||||
component: str | C,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
|
@ -2,25 +2,37 @@ import sqlalchemy as sa
|
||||
|
||||
from co3 import util
|
||||
from co3.accessor import Accessor
|
||||
from co3.accessors.sql import SQLAccessor
|
||||
|
||||
|
||||
class FTSAccessor(Accessor):
|
||||
def search(
|
||||
'''
|
||||
Perform queries on efficient full-text search (FTS) tables.
|
||||
|
||||
Note how this class doesn't inherit from ``SQLAccessor``, or even
|
||||
``RelationalAccessor``. We don't look at FTS tables as Relations, due to restrictions
|
||||
on their JOIN capabilities, and avoid embracing what composability they may have to
|
||||
prevent accidentally introducing inefficiencies. Instead, just single FTS tables can
|
||||
be selected from via this Accessor, and the FTSTable type, despite the use of the word
|
||||
"table", is a direct child of the Component type.
|
||||
'''
|
||||
def __init__(self):
|
||||
self.sql_accessor = SQLAccessor()
|
||||
self.access_log = sql_accessor.access_log
|
||||
|
||||
def select(
|
||||
self,
|
||||
connection,
|
||||
table_name : str,
|
||||
select_cols : str | list | None = '*',
|
||||
search_cols : str | None = None,
|
||||
q : str | None = None,
|
||||
colq : str | None = None,
|
||||
snip_col : int | None = 0,
|
||||
hl_col : int | None = 0,
|
||||
limit : int | None = 100,
|
||||
snip : int | None = 64,
|
||||
tokenizer : str | None = 'unicode61',
|
||||
group_by : str | None = None,
|
||||
agg_cols : list | None = None,
|
||||
wherein_dict: dict | None = None,
|
||||
unique_on : dict | None = None,
|
||||
select_cols : str | list | None = '*',
|
||||
search_cols : str | None = None,
|
||||
query : str | None = None,
|
||||
col_query : str | None = None,
|
||||
snip_col : int | None = 0,
|
||||
hl_col : int | None = 0,
|
||||
limit : int | None = 100,
|
||||
snip : int | None = 64,
|
||||
tokenizer : str | None = 'unicode61',
|
||||
):
|
||||
'''
|
||||
Execute a search query against an indexed FTS table for specific primitives. This
|
||||
@ -33,23 +45,6 @@ class FTSAccessor(Accessor):
|
||||
``MATCH``-based score for the text & column queries. Results are (a list of) fully
|
||||
expanded dictionaries housing column-value pairs.
|
||||
|
||||
Note:
|
||||
GROUP BY cannot be paired with SQLITE FTS extensions; thus, we perform manual
|
||||
group checks on the result set in Python before response
|
||||
|
||||
Analysis:
|
||||
The returned JSON structure has been (loosely) optimized for speed on the client
|
||||
side. Fully forming individual dictionary based responses saves time in
|
||||
Javascript, as the JSON parser is expected to be able to create the objects
|
||||
faster than post-hoc construction in JS. This return structure was compared
|
||||
against returning an array of arrays (all ordered in the same fashion), along with
|
||||
a column list to be associated with each of the result values. While this saves
|
||||
some size on the payload (the same column names don't have to be transmitted for
|
||||
each result), the size of the returned content massively outweighs the
|
||||
predominantly short column names. The only way this structure would be viable is
|
||||
if a significant amount was saved on transfer compared to the slow down in JS
|
||||
object construction; this is (almost) never the case.
|
||||
|
||||
Parameters:
|
||||
table_name : name of FTS table to search
|
||||
search_cols : space separated string of columns to use for primary queries
|
||||
@ -62,28 +57,25 @@ class FTSAccessor(Accessor):
|
||||
limit : maximum number of results to return in the SQL query
|
||||
snip : snippet length (max: 64)
|
||||
tokenizer : tokenizer to use (assumes relevant FTS table has been built)
|
||||
...
|
||||
wherein_dict: (col-name, value-list) pairs to match result set against, via
|
||||
WHERE ... IN clauses
|
||||
|
||||
Returns:
|
||||
Dictionary with search results (list of column indexed dictionaries) and relevant
|
||||
metadata.
|
||||
'''
|
||||
search_q = ''
|
||||
search_query = ''
|
||||
|
||||
if type(select_cols) is list:
|
||||
select_cols = ', '.join(select_cols)
|
||||
|
||||
# construct main search query
|
||||
if search_cols and q:
|
||||
search_q = f'{{{search_cols}}} : {q}'
|
||||
if search_cols and query:
|
||||
search_query = f'{{{search_cols}}} : {query}'
|
||||
|
||||
# add auxiliary search constraints
|
||||
if colq:
|
||||
search_q += f' {colq}'
|
||||
if col_query:
|
||||
search_query += f' {col_query}'
|
||||
|
||||
search_q = search_q.strip()
|
||||
search_query = search_query.strip()
|
||||
|
||||
hl_start = '<b><mark>'
|
||||
hl_end = '</mark></b>'
|
||||
@ -99,8 +91,8 @@ class FTSAccessor(Accessor):
|
||||
'''
|
||||
|
||||
where_clauses = []
|
||||
if search_q:
|
||||
where_clauses.append(f"{fts_table_name} MATCH '{search_q}'\n")
|
||||
if search_query:
|
||||
where_clauses.append(f"{fts_table_name} MATCH '{search_query}'\n")
|
||||
|
||||
if wherein_dict:
|
||||
for col, vals in wherein_dict.items():
|
||||
@ -112,36 +104,6 @@ class FTSAccessor(Accessor):
|
||||
|
||||
sql += f'ORDER BY rank LIMIT {limit};'
|
||||
|
||||
row_dicts, cols = self.raw_select(sql, include_cols=True)
|
||||
row_dicts, cols = self.sql_accessor.raw_select(connection, sql, include_cols=True)
|
||||
|
||||
if group_by is None:
|
||||
return row_dicts, cols
|
||||
|
||||
if agg_cols is None:
|
||||
agg_cols = []
|
||||
|
||||
# "group by" block ID and wrangle the links into a list
|
||||
# note we can't perform native GROUP BYs with FTS results
|
||||
group_by_idx = {}
|
||||
for row in row_dicts:
|
||||
group_by_attr = row.get(group_by)
|
||||
|
||||
# add new entries
|
||||
for agg_col in agg_cols:
|
||||
row[f'{agg_col}_agg'] = set()
|
||||
|
||||
if group_by_attr is None:
|
||||
continue
|
||||
|
||||
if group_by_attr not in group_by_idx:
|
||||
group_by_idx[group_by_attr] = row
|
||||
|
||||
for agg_col in agg_cols:
|
||||
if agg_col in row:
|
||||
group_by_idx[group_by_attr][f'{agg_col}_agg'].add(row[agg_col])
|
||||
|
||||
return {
|
||||
'results' : group_by_idx,
|
||||
'columns' : cols,
|
||||
'num_results' : len(row_dicts),
|
||||
}
|
||||
return row_dicts, cols
|
||||
|
@ -1,62 +1,58 @@
|
||||
'''
|
||||
Design proposal: variable backends
|
||||
.. admonition:: Design proposal: variable backends
|
||||
|
||||
One particular feature not supported by the current type hierarchy is the possible use of
|
||||
different backends to implement a general interface like SQLAccessor. One could imagine,
|
||||
for instance, using ``sqlalchemy`` or ``sqlite`` to define the same methods laid out in a
|
||||
parent class blueprint. It's not too difficult to imagine cases where both of these may be
|
||||
useful, but for now it is outside the development scope. Should it ever enter the scope,
|
||||
however, we might consider a simple ``backend`` argument on instantiation, keeping just the
|
||||
SQLAccessor exposed rather than a whole set of backend-specific types:
|
||||
One particular feature not supported by the current type hierarchy is the possible use of
|
||||
different backends to implement a general interface like SQLAccessor. One could imagine,
|
||||
for instance, using ``sqlalchemy`` or ``sqlite`` to define the same methods laid out in a
|
||||
parent class blueprint. It's not too difficult to imagine cases where both of these may be
|
||||
useful, but for now it is outside the development scope. Should it ever enter the scope,
|
||||
however, we might consider a simple ``backend`` argument on instantiation, keeping just the
|
||||
SQLAccessor exposed rather than a whole set of backend-specific types:
|
||||
|
||||
.. code-block:: python
|
||||
.. code-block:: python
|
||||
|
||||
class SQLAlchemyAccessor(RelationalAccessor): # may also inherit from a dedicated interface parent
|
||||
def select(...):
|
||||
...
|
||||
class SQLAlchemyAccessor(RelationalAccessor): # may also inherit from a dedicated interface parent
|
||||
def select(...):
|
||||
...
|
||||
|
||||
class SQLiteAccessor(RelationalAccessor):
|
||||
def select(...):
|
||||
...
|
||||
class SQLiteAccessor(RelationalAccessor):
|
||||
def select(...):
|
||||
...
|
||||
|
||||
class SQLAccessor(RelationalAccessor):
|
||||
backends = {
|
||||
'sqlalchemy': SQLAlchemyAccessor,
|
||||
'sqlite': SQLteAccessor,
|
||||
}
|
||||
class SQLAccessor(RelationalAccessor):
|
||||
backends = {
|
||||
'sqlalchemy': SQLAlchemyAccessor,
|
||||
'sqlite': SQLteAccessor,
|
||||
}
|
||||
|
||||
def __init__(self, backend: str):
|
||||
self.backend = self.backends.get(backend)
|
||||
def __init__(self, backend: str):
|
||||
self.backend = self.backends.get(backend)
|
||||
|
||||
def select(...):
|
||||
return self.backend.select(...)
|
||||
def select(...):
|
||||
return self.backend.select(...)
|
||||
|
||||
|
||||
For now, we can look at SQLAccessor (and equivalents in other type hierarchies, like
|
||||
SQLManagers) as being SQLAlchemyAccessors and not supporting any backend swapping. But in
|
||||
theory, to make the above change, we'd just need to rename it and wrap it up.
|
||||
For now, we can look at SQLAccessor (and equivalents in other type hierarchies, like
|
||||
SQLManagers) as being SQLAlchemyAccessors and not supporting any backend swapping. But in
|
||||
theory, to make the above change, we'd just need to rename it and wrap it up.
|
||||
'''
|
||||
|
||||
from pathlib import Path
|
||||
from collections.abc import Iterable
|
||||
import inspect
|
||||
from functools import cache
|
||||
from pathlib import Path
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from co3 import util
|
||||
from co3.engines import SQLEngine
|
||||
from co3.accessor import Accessor
|
||||
from co3.components import Relation, SQLTable
|
||||
|
||||
|
||||
class RelationalAccessor[R: Relation](Accessor[R]):
|
||||
@staticmethod
|
||||
def raw_select(
|
||||
self,
|
||||
connection,
|
||||
text: str
|
||||
):
|
||||
#connection.exec
|
||||
raise NotImplementedError
|
||||
|
||||
def select(
|
||||
@ -110,6 +106,7 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
|
||||
bind_params=bind_params,
|
||||
include_cols=include_cols
|
||||
)
|
||||
self.log_access(sql)
|
||||
|
||||
if mappings:
|
||||
return res.mappings().all()
|
||||
@ -165,6 +162,7 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
|
||||
statement = statement.limit(limit)
|
||||
|
||||
res = SQLEngine.execute(connection, statement, include_cols=include_cols)
|
||||
self.log_access(statement)
|
||||
|
||||
if mappings:
|
||||
return res.mappings().all()
|
||||
@ -181,11 +179,15 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
|
||||
returned dictionaries. This information is not available under CursorResults and thus
|
||||
must be provided separately. This will yield results like the following:
|
||||
|
||||
[..., {'table1.col':<value>, 'table2.col':<value>, ...}, ...]
|
||||
.. code-block:: python
|
||||
|
||||
[..., {'table1.col':<value>, 'table2.col':<value>, ...}, ...]
|
||||
|
||||
Instead of the automatic mapping names:
|
||||
|
||||
[..., {'col':<value>, 'col_1':<value>, ...}, ...]
|
||||
.. code-block:: python
|
||||
|
||||
[..., {'col':<value>, 'col_1':<value>, ...}, ...]
|
||||
|
||||
which can make accessing certain results a little more intuitive.
|
||||
'''
|
||||
|
@ -1,10 +1,9 @@
|
||||
import time
|
||||
import pickle
|
||||
import logging
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
import sqlalchemy as sa
|
||||
#from sentence_transformers import SentenceTransformer, util
|
||||
|
||||
from co3.accessor import Accessor
|
||||
|
||||
@ -55,10 +54,11 @@ class VSSAccessor(Accessor):
|
||||
normalize_embeddings = True
|
||||
)
|
||||
|
||||
def search(
|
||||
def select(
|
||||
self,
|
||||
query : str,
|
||||
connection,
|
||||
index_name : str,
|
||||
query : str,
|
||||
limit : int = 10,
|
||||
score_threshold = 0.5,
|
||||
):
|
||||
|
@ -111,6 +111,9 @@ class SQLTable(Relation[SQLTableLike]):
|
||||
self.obj.join(_with.obj, on, isouter=outer)
|
||||
)
|
||||
|
||||
class FTSTable(Relation[SQLTableLike]):
|
||||
pass
|
||||
|
||||
# key-value stores
|
||||
class Dictionary(Relation[dict]):
|
||||
def get_attributes(self):
|
||||
|
@ -1,12 +1,11 @@
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
import time
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
|
||||
from tqdm import tqdm
|
||||
import sqlalchemy as sa
|
||||
|
||||
from co3 import util
|
||||
|
||||
from co3.manager import Manager
|
||||
from co3.accessors.sql import SQLAccessor
|
||||
|
||||
@ -14,154 +13,24 @@ from co3.accessors.sql import SQLAccessor
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class FTSManager(Manager):
|
||||
def __init__(self, database, engine=None):
|
||||
super().__init__(database, engine)
|
||||
|
||||
self.accessor = SQLAccessor(engine)
|
||||
self.composer = CoreComposer()
|
||||
|
||||
def recreate_simple(self):
|
||||
select_cols = [
|
||||
tables.files.c.rpath, tables.files.c.ftype,
|
||||
|
||||
tables.notes.c.title, tables.notes.c.link,
|
||||
|
||||
tables.note_conversion_matter.c.type,
|
||||
|
||||
tables.blocks.c.name, tables.blocks.c.header,
|
||||
tables.blocks.c.elem_pos,
|
||||
|
||||
tables.block_conversion_matter.c.content,
|
||||
|
||||
tables.links.c.target,
|
||||
#tables.links.c.target.label('link_target'),
|
||||
]
|
||||
def __init__(self):
|
||||
self.sql_accessor = SQLAccessor()
|
||||
|
||||
def recreate_from_table(self, table: str, cols):
|
||||
inserts, res_cols = self.accessor.select(
|
||||
'leftward_link_conversions',
|
||||
cols=select_cols,
|
||||
table,
|
||||
cols=cols,
|
||||
include_cols=True,
|
||||
)
|
||||
|
||||
util.db.populate_fts5(
|
||||
self.engine,
|
||||
['search'],
|
||||
#columns=select_cols,
|
||||
columns=res_cols,
|
||||
inserts=inserts,
|
||||
)
|
||||
|
||||
def recreate_pivot(self):
|
||||
block_cols = [
|
||||
tables.files.c.rpath, tables.files.c.ftype,
|
||||
|
||||
tables.notes.c.title, tables.notes.c.link,
|
||||
|
||||
tables.note_conversion_matter.c.type,
|
||||
|
||||
tables.blocks.c.name, tables.blocks.c.header,
|
||||
tables.blocks.c.elem_pos,
|
||||
|
||||
#tables.block_conversion_matter.c.content,
|
||||
|
||||
#tables.links.c.target,
|
||||
|
||||
#tables.links.c.target.label('link_target'),
|
||||
]
|
||||
|
||||
lcp = self.accessor.link_content_pivot(cols=block_cols)
|
||||
|
||||
start = time.time()
|
||||
inserts, cols = self.accessor.select(
|
||||
lcp,
|
||||
#cols=select_cols
|
||||
include_cols=True,
|
||||
)
|
||||
logger.info(f'FTS recreate: pre-index SELECT took {time.time()-start:.2f}s')
|
||||
|
||||
return inserts
|
||||
|
||||
#util.db.populate_fts5(
|
||||
# self.engine,
|
||||
# ['search'],
|
||||
# #columns=select_cols,
|
||||
# columns=res_cols,
|
||||
# inserts=inserts,
|
||||
#)
|
||||
|
||||
def recreate(self):
|
||||
select_cols = [
|
||||
tables.links.c.id,
|
||||
|
||||
tables.files.c.rpath, tables.files.c.ftype,
|
||||
|
||||
tables.files.c.title, tables.files.c.link,
|
||||
|
||||
tables.notes.c.type,
|
||||
|
||||
tables.elements.c.name, tables.blocks.c.header,
|
||||
tables.elements.c.elem_pos,
|
||||
|
||||
tables.element_conversions.c.format,
|
||||
tables.element_conversions.c.content,
|
||||
|
||||
tables.links.c.target,
|
||||
tables.links.c.type.label('link_type'),
|
||||
#tables.links.c.target.label('link_target'),
|
||||
]
|
||||
|
||||
start = time.time()
|
||||
|
||||
fts_basis = self.composer.get_table('leftward_link_conversions')
|
||||
inserts = self.accessor.select(
|
||||
fts_basis,
|
||||
cols=select_cols,
|
||||
)
|
||||
logger.info(f'FTS recreate: pre-index SELECT took {time.time()-start:.2f}s')
|
||||
|
||||
start = time.time()
|
||||
flattened_blocks = defaultdict(lambda:defaultdict(lambda:None))
|
||||
other_results = []
|
||||
|
||||
# flatten conversions across supported formats
|
||||
# i.e. put "src" and "html" into columns within a single row
|
||||
for row in inserts:
|
||||
name = row['name']
|
||||
|
||||
# remove pure identifier rows
|
||||
link_id = row.pop('id')
|
||||
_format = row.pop('format')
|
||||
content = row.pop('content')
|
||||
|
||||
# add new derived rows
|
||||
row['src'] = None
|
||||
row['html'] = None
|
||||
|
||||
if name is None:
|
||||
other_results.append(row)
|
||||
continue
|
||||
|
||||
if flattened_blocks[name][link_id] is None:
|
||||
flattened_blocks[name][link_id] = row
|
||||
|
||||
nrow = flattened_blocks[name][link_id]
|
||||
if _format == 'src':
|
||||
nrow['src'] = content
|
||||
elif _format == 'html5':
|
||||
nrow['html'] = content
|
||||
|
||||
inserts = other_results
|
||||
for name, link_dict in flattened_blocks.items():
|
||||
for link_id, row in link_dict.items():
|
||||
inserts.append(row)
|
||||
|
||||
# should have at least one result to index
|
||||
assert len(inserts) > 0
|
||||
cols = inserts[0].keys()
|
||||
|
||||
# all keys should match
|
||||
assert all([cols == d.keys() for d in inserts])
|
||||
|
||||
logger.info(f'FTS recreate: insert post-processing took {time.time()-start:.2f}s')
|
||||
|
||||
util.db.populate_fts5(
|
||||
@ -177,40 +46,3 @@ class FTSManager(Manager):
|
||||
|
||||
def migrate(self): pass
|
||||
|
||||
|
||||
|
||||
def recreate_old(self):
|
||||
full_table = schema.leftward_block_conversions()
|
||||
primitive_tables = schema.primitive_tables()
|
||||
|
||||
virtual_inserts = []
|
||||
for ptype, table in primitive_tables.items():
|
||||
table_rows = db.util.sa_exec_dicts(self.engine, sa.select(table))
|
||||
for row in table_rows:
|
||||
pdict = util.db.prepare_insert(full_table, row)
|
||||
pdict['ptype'] = ptype
|
||||
|
||||
virtual_inserts.append(pdict)
|
||||
|
||||
select_cols = [
|
||||
tables.blocks.c.name, tables.blocks.c.content,
|
||||
tables.blocks.c.html, tables.blocks.c.header,
|
||||
tables.blocks.c.block_pos, tables.blocks.c.note_name,
|
||||
|
||||
tables.links.c.to_file,
|
||||
|
||||
tables.notes.c.link,
|
||||
|
||||
tables.note_conversion_matter.c.type,
|
||||
tables.note_conversion_matter.c.summary,
|
||||
|
||||
'ptype',
|
||||
]
|
||||
|
||||
util.db.populate_fts5(
|
||||
self.engine,
|
||||
'search',
|
||||
columns=select_cols,
|
||||
inserts=virtual_inserts,
|
||||
)
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
'''
|
||||
Note: Common on insert behavior
|
||||
.. admonition:: Common on insert behavior
|
||||
|
||||
- Tables with unique constraints have been equipped with ``sqlite_on_conflict_unique``
|
||||
flags, enabling conflicting bulk inserts to replace conflicting rows gracefully. No
|
||||
need to worry about explicitly handling upserts.
|
||||
@ -9,7 +10,8 @@ Note: Common on insert behavior
|
||||
bulk calls AND allow update/replacements on conflict; the setting above allows this
|
||||
bulk usage to continue as is.
|
||||
|
||||
Note: Options for insert/update model
|
||||
.. admonition:: Options for insert/update model
|
||||
|
||||
1. Collect all possible update objects, and use a SQLite bulk insert that only updates
|
||||
if needed (based on modified time, for example). This sounds on the surface like
|
||||
the right approach since we defer as much to bulk SQL logic, but it's far and away
|
||||
@ -50,8 +52,6 @@ from co3.engines import SQLEngine
|
||||
from co3.manager import Manager
|
||||
from co3.components import Relation, SQLTable
|
||||
|
||||
#from localsys.reloader.router._base import ChainRouter, Event
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -76,24 +76,21 @@ class SQLManager(RelationalManager[SQLTable]):
|
||||
def __init__(self, *args, **kwargs):
|
||||
'''
|
||||
The insert lock is a _reentrant lock_, meaning the same thread can acquire the
|
||||
lock again with out deadlocking (simplifying across methods of this class that
|
||||
lock again without deadlocking (simplifying across methods of this class that
|
||||
need it).
|
||||
'''
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.routers = []
|
||||
|
||||
self._router = None
|
||||
self._insert_lock = threading.RLock()
|
||||
|
||||
@property
|
||||
def router(self):
|
||||
if self._router is None:
|
||||
self._router = ChainRouter(self.routers)
|
||||
return self._router
|
||||
def update(self):
|
||||
pass
|
||||
|
||||
def add_router(self, router):
|
||||
self.routers.append(router)
|
||||
def migrate(self):
|
||||
pass
|
||||
|
||||
def sync(self):
|
||||
pass
|
||||
|
||||
def recreate(
|
||||
self,
|
||||
@ -112,8 +109,6 @@ class SQLManager(RelationalManager[SQLTable]):
|
||||
metadata.drop_all(engine.manager)
|
||||
metadata.create_all(engine.manager, checkfirst=True)
|
||||
|
||||
def update(self): pass
|
||||
|
||||
def insert(
|
||||
self,
|
||||
connection,
|
||||
@ -167,246 +162,3 @@ class SQLManager(RelationalManager[SQLTable]):
|
||||
|
||||
return res_list
|
||||
|
||||
def _file_sync_bools(self):
|
||||
synced_bools = []
|
||||
fpaths = utils.paths.iter_nested_paths(self.collector.basepath, no_dir=True)
|
||||
db_rpath_index = self.database.files.group_by('rpath')()
|
||||
|
||||
for fpath in fpaths:
|
||||
file = File(fpath, self.collector.basepath)
|
||||
db_file = db_rpath_index.get(file.rpath)
|
||||
|
||||
file_in_sync = db_file and float(db_file.get('mtime','0')) >= file.mtime
|
||||
synced_bools.append(file_in_sync)
|
||||
|
||||
return fpaths, synced_bools
|
||||
|
||||
def sync(
|
||||
self,
|
||||
limit=0,
|
||||
chunk_time=0,
|
||||
dry=False,
|
||||
chunk_size_cap=1000,
|
||||
print_report=True,
|
||||
):
|
||||
'''
|
||||
Note:
|
||||
Could be dangerous if going from fast file processing to note processing.
|
||||
Imagine estimating 1000 iter/sec, then transferring that to the next batch
|
||||
when it's more like 0.2 iter/sec. We would lose any chunking. (Luckily, in
|
||||
practice, turns out that notes are almost always processed before the huge set
|
||||
of nested files lurking and we don't experience this issue.)
|
||||
'''
|
||||
#filepaths, update_bools = self.collector.file_inserts(dry_run=True)
|
||||
#fpaths_to_update = [f for (f, b) in zip(filepaths, update_bools) if not b]
|
||||
filepaths, update_bools = self._file_sync_bools()
|
||||
fpaths_to_update = [f for (f, b) in zip(filepaths, update_bools) if not b]
|
||||
|
||||
def calc_chunk_size(sec_per_file):
|
||||
'''
|
||||
Chunk cap only applied here; if manually set elsewhere in the method, it's
|
||||
intentional e.g., if chunk_time <= 0 and we just want one big iteration.
|
||||
'''
|
||||
return min(max(int(chunk_time / sec_per_file), 1), chunk_size_cap)
|
||||
|
||||
# nothing to do
|
||||
if not fpaths_to_update:
|
||||
logger.info('Sync has nothing to do, exiting')
|
||||
return None
|
||||
|
||||
ood_count = len(fpaths_to_update)
|
||||
total_fps = len(filepaths)
|
||||
ood_prcnt = ood_count/total_fps*100
|
||||
logger.info(
|
||||
f'Pre-sync scan yielded {ood_count}/{total_fps} ({ood_prcnt:.2f}%) out-of-date files'
|
||||
)
|
||||
|
||||
if limit <= 0: limit = ood_count
|
||||
|
||||
if chunk_time <= 0:
|
||||
chunk_size = ood_count
|
||||
else:
|
||||
chunk_size = calc_chunk_size(5)
|
||||
|
||||
dry_inserts = None
|
||||
remaining = limit
|
||||
|
||||
chunk_time_used = 0
|
||||
pbar = tqdm(
|
||||
desc=f'Adaptive chunked sync for [limit {limit}]',
|
||||
total=remaining,
|
||||
)
|
||||
|
||||
report = []
|
||||
def collect_report():
|
||||
report.append({
|
||||
'chunk_size': chunk_limit,
|
||||
'chunk_time_used': chunk_time_used,
|
||||
})
|
||||
|
||||
with pbar as _:
|
||||
while remaining > 0:
|
||||
time_pcnt = chunk_time_used / max(chunk_time,1) * 100
|
||||
pbar.set_description(
|
||||
f'Adaptive chunked sync [size {chunk_size} (max {chunk_size_cap})] '
|
||||
f'[prev chunk {chunk_time_used:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]'
|
||||
)
|
||||
|
||||
chunk_limit = min(remaining, chunk_size)
|
||||
start_idx = limit - remaining
|
||||
chunk_fpaths = fpaths_to_update[start_idx:start_idx+chunk_limit]
|
||||
|
||||
chunk_time_start = time.time()
|
||||
#inserts = self.collector.collect_inserts(
|
||||
# chunk_fpaths,
|
||||
# skip_updated=False, # saves on DB check given provided list
|
||||
#)
|
||||
|
||||
# 1) flush synthetic events for the batch through the chained router
|
||||
# 2) block until completed and sweep up the collected inserts
|
||||
futures = self.router.submit([
|
||||
Event(
|
||||
endpoint=str(self.collector.basepath),
|
||||
name=str(path.relative_to(self.collector.basepath)),
|
||||
action=[0xffffffff], # synthetic action to match any flag filters
|
||||
) for path in chunk_fpaths
|
||||
])
|
||||
|
||||
for future in tqdm(
|
||||
as_completed(futures),
|
||||
total=chunk_size,
|
||||
desc=f'Awaiting chunk futures [submitted {len(futures)}/{chunk_size}]'
|
||||
):
|
||||
#print('Finished future', cfuture)
|
||||
try:
|
||||
result = future.result()
|
||||
except Exception as e:
|
||||
logger.warning(f"Sync job failed with exception {e}")
|
||||
|
||||
#wait(futures)#, timeout=10)
|
||||
inserts = self.collector.collect_inserts()
|
||||
|
||||
chunk_time_used = time.time() - chunk_time_start
|
||||
|
||||
collect_report()
|
||||
#if not inserts: break
|
||||
|
||||
if dry:
|
||||
if dry_inserts is None:
|
||||
dry_inserts = inserts
|
||||
else:
|
||||
for ik, iv in inserts.items():
|
||||
dry_inserts[ik].extend(iv)
|
||||
else:
|
||||
self.insert(inserts)
|
||||
|
||||
remaining -= chunk_limit
|
||||
|
||||
# re-calculate chunk size
|
||||
sec_per_file = chunk_time_used / chunk_size
|
||||
chunk_size = calc_chunk_size(sec_per_file)
|
||||
|
||||
#pbar.update(n=limit-remaining)
|
||||
pbar.update(n=chunk_limit)
|
||||
|
||||
if print_report:
|
||||
num_chunks = len(report)
|
||||
total_ins = sum(map(lambda c:c['chunk_size'], report))
|
||||
avg_chunk = total_ins / num_chunks
|
||||
total_time = sum(map(lambda c:c['chunk_time_used'], report))
|
||||
avg_time = total_time / num_chunks
|
||||
time_match = avg_time / max(chunk_time*100, 1)
|
||||
|
||||
print( 'Sync report: ')
|
||||
print(f' Total chunks : {num_chunks} ')
|
||||
print(f' Total file inserts : {total_ins} / {limit} ')
|
||||
print(f' Average chunk size : {avg_chunk:.2f} ')
|
||||
print(f' Total time spent : {total_time:.2f}s ')
|
||||
print(f' Average time / chunk : {avg_time:.2f}s / {chunk_time}s')
|
||||
print(f' Percent time match : {time_match:.2f}% ')
|
||||
|
||||
if dry: return dry_inserts
|
||||
|
||||
def migrate(self):
|
||||
# create MD object representation current DB state
|
||||
pre_metadata = sa.MetaData()
|
||||
pre_metadata.reflect(bind=self.engine)
|
||||
|
||||
table_rows = {}
|
||||
for table_name, table in pre_metadata.tables.items():
|
||||
# pre-migration rows
|
||||
select_results = utils.db.sa_execute(self.engine, sa.select(table))
|
||||
named_results = utils.db.named_results(table, select_results)
|
||||
|
||||
table_rows[table_name] = named_results
|
||||
|
||||
|
||||
#table_update_map = {
|
||||
# 'files': (
|
||||
# lambda r: File(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
|
||||
# ),
|
||||
# 'notes': (
|
||||
# lambda r: Note(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
|
||||
# ),
|
||||
# 'note_conversion_matter': (
|
||||
# lambda r: Note(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
|
||||
# ),
|
||||
#}
|
||||
|
||||
# update schema
|
||||
self.recreate()
|
||||
|
||||
with self.engine.connect() as connection:
|
||||
for table_name, table in tables.metadata.tables.items():
|
||||
if table_name not in table_rows: continue
|
||||
|
||||
logger.info(f'Migrating table "{table_name}"')
|
||||
|
||||
if table_name == 'files':
|
||||
update_note_rows = []
|
||||
|
||||
for i,note_row in enumerate(table_rows['files']):
|
||||
file = File(Path(NOTEPATH, note_row['name']), NOTEPATH)
|
||||
u_note_row = utils.db.prepare_insert(
|
||||
tables.files,
|
||||
file.get_table_data()
|
||||
)
|
||||
note_row.update({k:v for k,v in u_note_row.items() if v})
|
||||
update_note_rows.append(note_row)
|
||||
|
||||
table_rows['files'] = update_note_rows
|
||||
elif table_name == 'notes':
|
||||
update_note_rows = []
|
||||
|
||||
for note_row in table_rows['notes']:
|
||||
note = Note(Path(NOTEPATH, note_row['name']), NOTEPATH)
|
||||
u_note_row = utils.db.prepare_insert(
|
||||
tables.notes,
|
||||
note.get_table_data()
|
||||
)
|
||||
note_row.update({k:v for k,v in u_note_row.items() if v})
|
||||
update_note_rows.append(note_row)
|
||||
|
||||
table_rows['notes'] = update_note_rows
|
||||
|
||||
# generic update blueprint for later; now doesn't matter as conversion based
|
||||
# inserts require re-converting...so you may as well just do a full re-build.
|
||||
#update_rows = []
|
||||
#if table_name in table_update_map:
|
||||
# for i,row in enumerate(table_rows['files']):
|
||||
# update_obj = table_update_map[table_name](row)
|
||||
|
||||
# u_row = utils.db.prepare_insert(
|
||||
# table,
|
||||
# note.get_table_data()
|
||||
# )
|
||||
# note_row.update({k:v for k,v in u_note_row.items() if v})
|
||||
# update_note_rows.append(note_row)
|
||||
|
||||
connection.execute(
|
||||
sa.insert(table),
|
||||
table_rows[table_name]
|
||||
)
|
||||
|
||||
connection.commit()
|
||||
|
||||
|
@ -1,142 +1,16 @@
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
import pickle
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
from tqdm import tqdm
|
||||
import sqlalchemy as sa
|
||||
|
||||
from co3.manager import Manager
|
||||
from co3 import util
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class VSSManager(Manager):
|
||||
def __init__(self, database, engine=None):
|
||||
super().__init__(database, engine)
|
||||
def recreate(self): pass
|
||||
|
||||
from localsys.db.databases.core import CoreDatabase
|
||||
|
||||
self.core_db = CoreDatabase()
|
||||
|
||||
def recreate(self):
|
||||
start = time.time()
|
||||
|
||||
chunks = []
|
||||
chunk_ids = []
|
||||
block_map = {}
|
||||
for note_name, note_groupby in self.core_db.blocks().items():
|
||||
blocks = note_groupby.get('aggregates', [])
|
||||
for block in blocks:
|
||||
if block.get('format') != 'src': continue
|
||||
|
||||
block_name = block.get('name', '')
|
||||
block_content = block.get('content', '')
|
||||
block_map[block_name] = block_content
|
||||
|
||||
block_chunks, chunk_ranges = utils.embed.split_block(block_content)
|
||||
|
||||
chunks.extend(block_chunks)
|
||||
chunk_ids.extend([f'{block_name}@{r[0]}:{r[1]}' for r in chunk_ranges])
|
||||
|
||||
logger.info(f'VSS pre-index SELECT took {time.time()-start:.2f}s')
|
||||
start = time.time()
|
||||
|
||||
# one-time batched embedding
|
||||
chunk_embeddings = self.database.access.embed_chunks(chunks)
|
||||
|
||||
logger.info(f'VSS embeddings took {time.time()-start:.2f}s')
|
||||
start = time.time()
|
||||
|
||||
# aggregate embeddings up the content hierarchy
|
||||
zero_embed = lambda: np.atleast_2d(
|
||||
np.zeros(self.database.access._embedding_size, dtype='float32')
|
||||
)
|
||||
block_embedding_map = defaultdict(zero_embed)
|
||||
note_embedding_map = defaultdict(zero_embed)
|
||||
|
||||
for i, chunk_id in enumerate(chunk_ids):
|
||||
at_split = chunk_id.split('@')
|
||||
note_name = at_split[0]
|
||||
block_name = '@'.join(at_split[:3])
|
||||
|
||||
block_embedding_map[block_name] += chunk_embeddings[i]
|
||||
note_embedding_map[note_name] += chunk_embeddings[i]
|
||||
|
||||
# re-normalize aggregated embeddings
|
||||
# > little inefficient but gobbled up asymptotically by chunk processing
|
||||
# > could o/w first convert each map to a full 2D np array, then back
|
||||
for embed_map in [block_embedding_map, note_embedding_map]:
|
||||
for k, agg_embedding in embed_map.items():
|
||||
embed_map[k] = utils.embed.unit_row_wise(agg_embedding)
|
||||
|
||||
logger.info(f'VSS recreate: pre-index SELECT took {time.time()-start:.2f}s')
|
||||
|
||||
# cols = ['rowid', 'chunk_embedding']
|
||||
# def pack_inserts(embeddings):
|
||||
# return [
|
||||
# { cols[0]: i, cols[1]: embedding.tobytes() }
|
||||
# for i, embedding in enumerate(embeddings)
|
||||
# ]
|
||||
|
||||
# chunk_inserts = pack_inserts(chunk_embeddings)
|
||||
# block_inserts = pack_inserts(block_embedding_map.values())
|
||||
# note_inserts = pack_inserts(note_embedding_map.values())
|
||||
|
||||
chunk_ids = chunk_ids
|
||||
block_ids = list(block_embedding_map.keys())
|
||||
note_ids = list(note_embedding_map.keys())
|
||||
|
||||
block_embeddings = np.vstack(list(block_embedding_map.values()))
|
||||
note_embeddings = np.vstack(list(note_embedding_map.values()))
|
||||
|
||||
blocks = [block_map[b] for b in block_ids]
|
||||
notes = note_ids
|
||||
|
||||
self.database.access.write_embeddings({
|
||||
'chunks': (chunk_ids, chunk_embeddings, chunks),
|
||||
'blocks': (block_ids, block_embeddings, blocks),
|
||||
'notes' : (note_ids, note_embeddings, notes),
|
||||
})
|
||||
|
||||
logger.info(f'Post-embedding parsing took {time.time()-start:.2f}s')
|
||||
|
||||
# utils.db.create_vss0(
|
||||
# self.engine,
|
||||
# 'chunks',
|
||||
# columns=list(cols),
|
||||
# inserts=chunk_inserts,
|
||||
# populate=True,
|
||||
# reset=True,
|
||||
# embedding_size=self._embedding_size,
|
||||
# )
|
||||
|
||||
def update(self): pass
|
||||
def insert(self): pass
|
||||
|
||||
def sync(self): pass
|
||||
|
||||
def migrate(self): pass
|
||||
|
||||
def _archive_note_embed(self):
|
||||
for block in blocks:
|
||||
content = block.get('content')
|
||||
if not content: continue
|
||||
|
||||
block_chunks = utils.embed.split_block(content)
|
||||
chunks.extend(block_chunks)
|
||||
|
||||
embeddings = utils.embed.embed_chunks(chunks)
|
||||
block_embeddings = utils.embed.map_groups(embeddings, embed_block_map)
|
||||
|
||||
self.convert_map['vect'] = block_embeddings
|
||||
|
||||
note_embedding = utils.embed.unit_row_wise(
|
||||
embeddings.sum(axis=0).reshape(1, -1)
|
||||
)
|
||||
|
||||
return {
|
||||
'content': note_embedding.tobytes(),
|
||||
}
|
||||
|
@ -59,6 +59,7 @@ class Mapper[C: Component]:
|
||||
inserts (hence why we tie Mappers to Schemas one-to-one).
|
||||
|
||||
.. admonition:: Dev note
|
||||
|
||||
the Composer needs reconsideration, or at least its positioning directly in this
|
||||
class. It may be more appropriate to have at the Schema level, or even just
|
||||
dissolved altogether if arbitrary named Components can be attached to schemas.
|
||||
|
Loading…
Reference in New Issue
Block a user