diff --git a/.gitignore b/.gitignore
index d46023a..d8bede4 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@ __pycache__/
.pytest_cache/
localsys.egg-info/
.ipynb_checkpoints/
+.pytest_cache/
.python-version
# vendor and build files
diff --git a/co3.egg-info/SOURCES.txt b/co3.egg-info/SOURCES.txt
index 4df4b94..e27738d 100644
--- a/co3.egg-info/SOURCES.txt
+++ b/co3.egg-info/SOURCES.txt
@@ -36,4 +36,9 @@ co3/schemas/__init__.py
co3/util/__init__.py
co3/util/db.py
co3/util/regex.py
-co3/util/types.py
\ No newline at end of file
+co3/util/types.py
+tests/test_co3.py
+tests/test_database.py
+tests/test_imports.py
+tests/test_mapper.py
+tests/test_schema.py
\ No newline at end of file
diff --git a/co3/accessor.py b/co3/accessor.py
index cf08ab2..a7f941b 100644
--- a/co3/accessor.py
+++ b/co3/accessor.py
@@ -1,10 +1,9 @@
'''
-Accessor
-
Provides access to an underlying schema through a supported set of operations. Class
methods could be general, high-level SQL wrappers, or convenience functions for common
schema-specific queries.
'''
+import time
import inspect
from pathlib import Path
from collections import defaultdict
@@ -23,7 +22,16 @@ class Accessor[C: Component](metaclass=ABCMeta):
Parameters:
engine: SQLAlchemy engine to use for queries. Engine is initialized dynamically as
a property (based on the config) if not provided
+
+ Instance variables:
+ access_log: time-indexed log of access queries performed
'''
+ def __init__(self):
+ self.access_log = {}
+
+ def log_access(self, stmt):
+ self.access_log[time.time()] = f'{stmt}'
+
@abstractmethod
def raw_select(
self,
@@ -36,7 +44,7 @@ class Accessor[C: Component](metaclass=ABCMeta):
def select(
self,
connection,
- component: C,
+ component: str | C,
*args,
**kwargs
):
diff --git a/co3/accessors/fts.py b/co3/accessors/fts.py
index 16c5a4e..ed25d71 100644
--- a/co3/accessors/fts.py
+++ b/co3/accessors/fts.py
@@ -2,25 +2,37 @@ import sqlalchemy as sa
from co3 import util
from co3.accessor import Accessor
+from co3.accessors.sql import SQLAccessor
class FTSAccessor(Accessor):
- def search(
+ '''
+ Perform queries on efficient full-text search (FTS) tables.
+
+ Note how this class doesn't inherit from ``SQLAccessor``, or even
+ ``RelationalAccessor``. We don't look at FTS tables as Relations, due to restrictions
+ on their JOIN capabilities, and avoid embracing what composability they may have to
+ prevent accidentally introducing inefficiencies. Instead, just single FTS tables can
+ be selected from via this Accessor, and the FTSTable type, despite the use of the word
+ "table", is a direct child of the Component type.
+ '''
+ def __init__(self):
+ self.sql_accessor = SQLAccessor()
+ self.access_log = sql_accessor.access_log
+
+ def select(
self,
+ connection,
table_name : str,
- select_cols : str | list | None = '*',
- search_cols : str | None = None,
- q : str | None = None,
- colq : str | None = None,
- snip_col : int | None = 0,
- hl_col : int | None = 0,
- limit : int | None = 100,
- snip : int | None = 64,
- tokenizer : str | None = 'unicode61',
- group_by : str | None = None,
- agg_cols : list | None = None,
- wherein_dict: dict | None = None,
- unique_on : dict | None = None,
+ select_cols : str | list | None = '*',
+ search_cols : str | None = None,
+ query : str | None = None,
+ col_query : str | None = None,
+ snip_col : int | None = 0,
+ hl_col : int | None = 0,
+ limit : int | None = 100,
+ snip : int | None = 64,
+ tokenizer : str | None = 'unicode61',
):
'''
Execute a search query against an indexed FTS table for specific primitives. This
@@ -33,23 +45,6 @@ class FTSAccessor(Accessor):
``MATCH``-based score for the text & column queries. Results are (a list of) fully
expanded dictionaries housing column-value pairs.
- Note:
- GROUP BY cannot be paired with SQLITE FTS extensions; thus, we perform manual
- group checks on the result set in Python before response
-
- Analysis:
- The returned JSON structure has been (loosely) optimized for speed on the client
- side. Fully forming individual dictionary based responses saves time in
- Javascript, as the JSON parser is expected to be able to create the objects
- faster than post-hoc construction in JS. This return structure was compared
- against returning an array of arrays (all ordered in the same fashion), along with
- a column list to be associated with each of the result values. While this saves
- some size on the payload (the same column names don't have to be transmitted for
- each result), the size of the returned content massively outweighs the
- predominantly short column names. The only way this structure would be viable is
- if a significant amount was saved on transfer compared to the slow down in JS
- object construction; this is (almost) never the case.
-
Parameters:
table_name : name of FTS table to search
search_cols : space separated string of columns to use for primary queries
@@ -62,28 +57,25 @@ class FTSAccessor(Accessor):
limit : maximum number of results to return in the SQL query
snip : snippet length (max: 64)
tokenizer : tokenizer to use (assumes relevant FTS table has been built)
- ...
- wherein_dict: (col-name, value-list) pairs to match result set against, via
- WHERE ... IN clauses
Returns:
Dictionary with search results (list of column indexed dictionaries) and relevant
metadata.
'''
- search_q = ''
+ search_query = ''
if type(select_cols) is list:
select_cols = ', '.join(select_cols)
# construct main search query
- if search_cols and q:
- search_q = f'{{{search_cols}}} : {q}'
+ if search_cols and query:
+ search_query = f'{{{search_cols}}} : {query}'
# add auxiliary search constraints
- if colq:
- search_q += f' {colq}'
+ if col_query:
+ search_query += f' {col_query}'
- search_q = search_q.strip()
+ search_query = search_query.strip()
hl_start = ''
hl_end = ''
@@ -99,8 +91,8 @@ class FTSAccessor(Accessor):
'''
where_clauses = []
- if search_q:
- where_clauses.append(f"{fts_table_name} MATCH '{search_q}'\n")
+ if search_query:
+ where_clauses.append(f"{fts_table_name} MATCH '{search_query}'\n")
if wherein_dict:
for col, vals in wherein_dict.items():
@@ -112,36 +104,6 @@ class FTSAccessor(Accessor):
sql += f'ORDER BY rank LIMIT {limit};'
- row_dicts, cols = self.raw_select(sql, include_cols=True)
+ row_dicts, cols = self.sql_accessor.raw_select(connection, sql, include_cols=True)
- if group_by is None:
- return row_dicts, cols
-
- if agg_cols is None:
- agg_cols = []
-
- # "group by" block ID and wrangle the links into a list
- # note we can't perform native GROUP BYs with FTS results
- group_by_idx = {}
- for row in row_dicts:
- group_by_attr = row.get(group_by)
-
- # add new entries
- for agg_col in agg_cols:
- row[f'{agg_col}_agg'] = set()
-
- if group_by_attr is None:
- continue
-
- if group_by_attr not in group_by_idx:
- group_by_idx[group_by_attr] = row
-
- for agg_col in agg_cols:
- if agg_col in row:
- group_by_idx[group_by_attr][f'{agg_col}_agg'].add(row[agg_col])
-
- return {
- 'results' : group_by_idx,
- 'columns' : cols,
- 'num_results' : len(row_dicts),
- }
+ return row_dicts, cols
diff --git a/co3/accessors/sql.py b/co3/accessors/sql.py
index cf58bab..53c794e 100644
--- a/co3/accessors/sql.py
+++ b/co3/accessors/sql.py
@@ -1,62 +1,58 @@
'''
-Design proposal: variable backends
+.. admonition:: Design proposal: variable backends
-One particular feature not supported by the current type hierarchy is the possible use of
-different backends to implement a general interface like SQLAccessor. One could imagine,
-for instance, using ``sqlalchemy`` or ``sqlite`` to define the same methods laid out in a
-parent class blueprint. It's not too difficult to imagine cases where both of these may be
-useful, but for now it is outside the development scope. Should it ever enter the scope,
-however, we might consider a simple ``backend`` argument on instantiation, keeping just the
-SQLAccessor exposed rather than a whole set of backend-specific types:
-
-.. code-block:: python
-
- class SQLAlchemyAccessor(RelationalAccessor): # may also inherit from a dedicated interface parent
- def select(...):
- ...
+ One particular feature not supported by the current type hierarchy is the possible use of
+ different backends to implement a general interface like SQLAccessor. One could imagine,
+ for instance, using ``sqlalchemy`` or ``sqlite`` to define the same methods laid out in a
+ parent class blueprint. It's not too difficult to imagine cases where both of these may be
+ useful, but for now it is outside the development scope. Should it ever enter the scope,
+ however, we might consider a simple ``backend`` argument on instantiation, keeping just the
+ SQLAccessor exposed rather than a whole set of backend-specific types:
- class SQLiteAccessor(RelationalAccessor):
- def select(...):
- ...
+ .. code-block:: python
- class SQLAccessor(RelationalAccessor):
- backends = {
- 'sqlalchemy': SQLAlchemyAccessor,
- 'sqlite': SQLteAccessor,
- }
+ class SQLAlchemyAccessor(RelationalAccessor): # may also inherit from a dedicated interface parent
+ def select(...):
+ ...
+
+ class SQLiteAccessor(RelationalAccessor):
+ def select(...):
+ ...
+
+ class SQLAccessor(RelationalAccessor):
+ backends = {
+ 'sqlalchemy': SQLAlchemyAccessor,
+ 'sqlite': SQLteAccessor,
+ }
+
+ def __init__(self, backend: str):
+ self.backend = self.backends.get(backend)
+
+ def select(...):
+ return self.backend.select(...)
- def __init__(self, backend: str):
- self.backend = self.backends.get(backend)
- def select(...):
- return self.backend.select(...)
-
-
-For now, we can look at SQLAccessor (and equivalents in other type hierarchies, like
-SQLManagers) as being SQLAlchemyAccessors and not supporting any backend swapping. But in
-theory, to make the above change, we'd just need to rename it and wrap it up.
+ For now, we can look at SQLAccessor (and equivalents in other type hierarchies, like
+ SQLManagers) as being SQLAlchemyAccessors and not supporting any backend swapping. But in
+ theory, to make the above change, we'd just need to rename it and wrap it up.
'''
-
-from pathlib import Path
-from collections.abc import Iterable
import inspect
-from functools import cache
+from pathlib import Path
import sqlalchemy as sa
-from co3 import util
from co3.engines import SQLEngine
from co3.accessor import Accessor
from co3.components import Relation, SQLTable
class RelationalAccessor[R: Relation](Accessor[R]):
+ @staticmethod
def raw_select(
self,
connection,
text: str
):
- #connection.exec
raise NotImplementedError
def select(
@@ -110,6 +106,7 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
bind_params=bind_params,
include_cols=include_cols
)
+ self.log_access(sql)
if mappings:
return res.mappings().all()
@@ -165,6 +162,7 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
statement = statement.limit(limit)
res = SQLEngine.execute(connection, statement, include_cols=include_cols)
+ self.log_access(statement)
if mappings:
return res.mappings().all()
@@ -181,11 +179,15 @@ class SQLAccessor(RelationalAccessor[SQLTable]):
returned dictionaries. This information is not available under CursorResults and thus
must be provided separately. This will yield results like the following:
- [..., {'table1.col':, 'table2.col':, ...}, ...]
+ .. code-block:: python
+
+ [..., {'table1.col':, 'table2.col':, ...}, ...]
Instead of the automatic mapping names:
- [..., {'col':, 'col_1':, ...}, ...]
+ .. code-block:: python
+
+ [..., {'col':, 'col_1':, ...}, ...]
which can make accessing certain results a little more intuitive.
'''
diff --git a/co3/accessors/vss.py b/co3/accessors/vss.py
index 46d119c..6b49882 100644
--- a/co3/accessors/vss.py
+++ b/co3/accessors/vss.py
@@ -1,10 +1,9 @@
+import time
import pickle
import logging
from pathlib import Path
-import time
import sqlalchemy as sa
-#from sentence_transformers import SentenceTransformer, util
from co3.accessor import Accessor
@@ -55,10 +54,11 @@ class VSSAccessor(Accessor):
normalize_embeddings = True
)
- def search(
+ def select(
self,
- query : str,
+ connection,
index_name : str,
+ query : str,
limit : int = 10,
score_threshold = 0.5,
):
diff --git a/co3/components/__init__.py b/co3/components/__init__.py
index 0d048b4..123bfb8 100644
--- a/co3/components/__init__.py
+++ b/co3/components/__init__.py
@@ -111,6 +111,9 @@ class SQLTable(Relation[SQLTableLike]):
self.obj.join(_with.obj, on, isouter=outer)
)
+class FTSTable(Relation[SQLTableLike]):
+ pass
+
# key-value stores
class Dictionary(Relation[dict]):
def get_attributes(self):
diff --git a/co3/managers/fts.py b/co3/managers/fts.py
index 466b52b..4a3178e 100644
--- a/co3/managers/fts.py
+++ b/co3/managers/fts.py
@@ -1,12 +1,11 @@
-from collections import defaultdict
-import logging
import time
+import logging
+from collections import defaultdict
from tqdm import tqdm
import sqlalchemy as sa
from co3 import util
-
from co3.manager import Manager
from co3.accessors.sql import SQLAccessor
@@ -14,154 +13,24 @@ from co3.accessors.sql import SQLAccessor
logger = logging.getLogger(__name__)
class FTSManager(Manager):
- def __init__(self, database, engine=None):
- super().__init__(database, engine)
-
- self.accessor = SQLAccessor(engine)
- self.composer = CoreComposer()
-
- def recreate_simple(self):
- select_cols = [
- tables.files.c.rpath, tables.files.c.ftype,
-
- tables.notes.c.title, tables.notes.c.link,
-
- tables.note_conversion_matter.c.type,
-
- tables.blocks.c.name, tables.blocks.c.header,
- tables.blocks.c.elem_pos,
-
- tables.block_conversion_matter.c.content,
-
- tables.links.c.target,
- #tables.links.c.target.label('link_target'),
- ]
+ def __init__(self):
+ self.sql_accessor = SQLAccessor()
+ def recreate_from_table(self, table: str, cols):
inserts, res_cols = self.accessor.select(
- 'leftward_link_conversions',
- cols=select_cols,
+ table,
+ cols=cols,
include_cols=True,
)
util.db.populate_fts5(
self.engine,
['search'],
- #columns=select_cols,
columns=res_cols,
inserts=inserts,
)
- def recreate_pivot(self):
- block_cols = [
- tables.files.c.rpath, tables.files.c.ftype,
-
- tables.notes.c.title, tables.notes.c.link,
-
- tables.note_conversion_matter.c.type,
-
- tables.blocks.c.name, tables.blocks.c.header,
- tables.blocks.c.elem_pos,
-
- #tables.block_conversion_matter.c.content,
-
- #tables.links.c.target,
-
- #tables.links.c.target.label('link_target'),
- ]
-
- lcp = self.accessor.link_content_pivot(cols=block_cols)
-
- start = time.time()
- inserts, cols = self.accessor.select(
- lcp,
- #cols=select_cols
- include_cols=True,
- )
- logger.info(f'FTS recreate: pre-index SELECT took {time.time()-start:.2f}s')
-
- return inserts
-
- #util.db.populate_fts5(
- # self.engine,
- # ['search'],
- # #columns=select_cols,
- # columns=res_cols,
- # inserts=inserts,
- #)
-
def recreate(self):
- select_cols = [
- tables.links.c.id,
-
- tables.files.c.rpath, tables.files.c.ftype,
-
- tables.files.c.title, tables.files.c.link,
-
- tables.notes.c.type,
-
- tables.elements.c.name, tables.blocks.c.header,
- tables.elements.c.elem_pos,
-
- tables.element_conversions.c.format,
- tables.element_conversions.c.content,
-
- tables.links.c.target,
- tables.links.c.type.label('link_type'),
- #tables.links.c.target.label('link_target'),
- ]
-
- start = time.time()
-
- fts_basis = self.composer.get_table('leftward_link_conversions')
- inserts = self.accessor.select(
- fts_basis,
- cols=select_cols,
- )
- logger.info(f'FTS recreate: pre-index SELECT took {time.time()-start:.2f}s')
-
- start = time.time()
- flattened_blocks = defaultdict(lambda:defaultdict(lambda:None))
- other_results = []
-
- # flatten conversions across supported formats
- # i.e. put "src" and "html" into columns within a single row
- for row in inserts:
- name = row['name']
-
- # remove pure identifier rows
- link_id = row.pop('id')
- _format = row.pop('format')
- content = row.pop('content')
-
- # add new derived rows
- row['src'] = None
- row['html'] = None
-
- if name is None:
- other_results.append(row)
- continue
-
- if flattened_blocks[name][link_id] is None:
- flattened_blocks[name][link_id] = row
-
- nrow = flattened_blocks[name][link_id]
- if _format == 'src':
- nrow['src'] = content
- elif _format == 'html5':
- nrow['html'] = content
-
- inserts = other_results
- for name, link_dict in flattened_blocks.items():
- for link_id, row in link_dict.items():
- inserts.append(row)
-
- # should have at least one result to index
- assert len(inserts) > 0
- cols = inserts[0].keys()
-
- # all keys should match
- assert all([cols == d.keys() for d in inserts])
-
logger.info(f'FTS recreate: insert post-processing took {time.time()-start:.2f}s')
util.db.populate_fts5(
@@ -177,40 +46,3 @@ class FTSManager(Manager):
def migrate(self): pass
-
-
-def recreate_old(self):
- full_table = schema.leftward_block_conversions()
- primitive_tables = schema.primitive_tables()
-
- virtual_inserts = []
- for ptype, table in primitive_tables.items():
- table_rows = db.util.sa_exec_dicts(self.engine, sa.select(table))
- for row in table_rows:
- pdict = util.db.prepare_insert(full_table, row)
- pdict['ptype'] = ptype
-
- virtual_inserts.append(pdict)
-
- select_cols = [
- tables.blocks.c.name, tables.blocks.c.content,
- tables.blocks.c.html, tables.blocks.c.header,
- tables.blocks.c.block_pos, tables.blocks.c.note_name,
-
- tables.links.c.to_file,
-
- tables.notes.c.link,
-
- tables.note_conversion_matter.c.type,
- tables.note_conversion_matter.c.summary,
-
- 'ptype',
- ]
-
- util.db.populate_fts5(
- self.engine,
- 'search',
- columns=select_cols,
- inserts=virtual_inserts,
- )
-
diff --git a/co3/managers/sql.py b/co3/managers/sql.py
index 303a632..53c03fc 100644
--- a/co3/managers/sql.py
+++ b/co3/managers/sql.py
@@ -1,5 +1,6 @@
'''
-Note: Common on insert behavior
+.. admonition:: Common on insert behavior
+
- Tables with unique constraints have been equipped with ``sqlite_on_conflict_unique``
flags, enabling conflicting bulk inserts to replace conflicting rows gracefully. No
need to worry about explicitly handling upserts.
@@ -9,7 +10,8 @@ Note: Common on insert behavior
bulk calls AND allow update/replacements on conflict; the setting above allows this
bulk usage to continue as is.
-Note: Options for insert/update model
+.. admonition:: Options for insert/update model
+
1. Collect all possible update objects, and use a SQLite bulk insert that only updates
if needed (based on modified time, for example). This sounds on the surface like
the right approach since we defer as much to bulk SQL logic, but it's far and away
@@ -50,8 +52,6 @@ from co3.engines import SQLEngine
from co3.manager import Manager
from co3.components import Relation, SQLTable
-#from localsys.reloader.router._base import ChainRouter, Event
-
logger = logging.getLogger(__name__)
@@ -76,24 +76,21 @@ class SQLManager(RelationalManager[SQLTable]):
def __init__(self, *args, **kwargs):
'''
The insert lock is a _reentrant lock_, meaning the same thread can acquire the
- lock again with out deadlocking (simplifying across methods of this class that
+ lock again without deadlocking (simplifying across methods of this class that
need it).
'''
super().__init__(*args, **kwargs)
- self.routers = []
-
- self._router = None
self._insert_lock = threading.RLock()
- @property
- def router(self):
- if self._router is None:
- self._router = ChainRouter(self.routers)
- return self._router
+ def update(self):
+ pass
- def add_router(self, router):
- self.routers.append(router)
+ def migrate(self):
+ pass
+
+ def sync(self):
+ pass
def recreate(
self,
@@ -112,8 +109,6 @@ class SQLManager(RelationalManager[SQLTable]):
metadata.drop_all(engine.manager)
metadata.create_all(engine.manager, checkfirst=True)
- def update(self): pass
-
def insert(
self,
connection,
@@ -167,246 +162,3 @@ class SQLManager(RelationalManager[SQLTable]):
return res_list
- def _file_sync_bools(self):
- synced_bools = []
- fpaths = utils.paths.iter_nested_paths(self.collector.basepath, no_dir=True)
- db_rpath_index = self.database.files.group_by('rpath')()
-
- for fpath in fpaths:
- file = File(fpath, self.collector.basepath)
- db_file = db_rpath_index.get(file.rpath)
-
- file_in_sync = db_file and float(db_file.get('mtime','0')) >= file.mtime
- synced_bools.append(file_in_sync)
-
- return fpaths, synced_bools
-
- def sync(
- self,
- limit=0,
- chunk_time=0,
- dry=False,
- chunk_size_cap=1000,
- print_report=True,
- ):
- '''
- Note:
- Could be dangerous if going from fast file processing to note processing.
- Imagine estimating 1000 iter/sec, then transferring that to the next batch
- when it's more like 0.2 iter/sec. We would lose any chunking. (Luckily, in
- practice, turns out that notes are almost always processed before the huge set
- of nested files lurking and we don't experience this issue.)
- '''
- #filepaths, update_bools = self.collector.file_inserts(dry_run=True)
- #fpaths_to_update = [f for (f, b) in zip(filepaths, update_bools) if not b]
- filepaths, update_bools = self._file_sync_bools()
- fpaths_to_update = [f for (f, b) in zip(filepaths, update_bools) if not b]
-
- def calc_chunk_size(sec_per_file):
- '''
- Chunk cap only applied here; if manually set elsewhere in the method, it's
- intentional e.g., if chunk_time <= 0 and we just want one big iteration.
- '''
- return min(max(int(chunk_time / sec_per_file), 1), chunk_size_cap)
-
- # nothing to do
- if not fpaths_to_update:
- logger.info('Sync has nothing to do, exiting')
- return None
-
- ood_count = len(fpaths_to_update)
- total_fps = len(filepaths)
- ood_prcnt = ood_count/total_fps*100
- logger.info(
- f'Pre-sync scan yielded {ood_count}/{total_fps} ({ood_prcnt:.2f}%) out-of-date files'
- )
-
- if limit <= 0: limit = ood_count
-
- if chunk_time <= 0:
- chunk_size = ood_count
- else:
- chunk_size = calc_chunk_size(5)
-
- dry_inserts = None
- remaining = limit
-
- chunk_time_used = 0
- pbar = tqdm(
- desc=f'Adaptive chunked sync for [limit {limit}]',
- total=remaining,
- )
-
- report = []
- def collect_report():
- report.append({
- 'chunk_size': chunk_limit,
- 'chunk_time_used': chunk_time_used,
- })
-
- with pbar as _:
- while remaining > 0:
- time_pcnt = chunk_time_used / max(chunk_time,1) * 100
- pbar.set_description(
- f'Adaptive chunked sync [size {chunk_size} (max {chunk_size_cap})] '
- f'[prev chunk {chunk_time_used:.2f}s/{chunk_time}s ({time_pcnt:.2f}%)]'
- )
-
- chunk_limit = min(remaining, chunk_size)
- start_idx = limit - remaining
- chunk_fpaths = fpaths_to_update[start_idx:start_idx+chunk_limit]
-
- chunk_time_start = time.time()
- #inserts = self.collector.collect_inserts(
- # chunk_fpaths,
- # skip_updated=False, # saves on DB check given provided list
- #)
-
- # 1) flush synthetic events for the batch through the chained router
- # 2) block until completed and sweep up the collected inserts
- futures = self.router.submit([
- Event(
- endpoint=str(self.collector.basepath),
- name=str(path.relative_to(self.collector.basepath)),
- action=[0xffffffff], # synthetic action to match any flag filters
- ) for path in chunk_fpaths
- ])
-
- for future in tqdm(
- as_completed(futures),
- total=chunk_size,
- desc=f'Awaiting chunk futures [submitted {len(futures)}/{chunk_size}]'
- ):
- #print('Finished future', cfuture)
- try:
- result = future.result()
- except Exception as e:
- logger.warning(f"Sync job failed with exception {e}")
-
- #wait(futures)#, timeout=10)
- inserts = self.collector.collect_inserts()
-
- chunk_time_used = time.time() - chunk_time_start
-
- collect_report()
- #if not inserts: break
-
- if dry:
- if dry_inserts is None:
- dry_inserts = inserts
- else:
- for ik, iv in inserts.items():
- dry_inserts[ik].extend(iv)
- else:
- self.insert(inserts)
-
- remaining -= chunk_limit
-
- # re-calculate chunk size
- sec_per_file = chunk_time_used / chunk_size
- chunk_size = calc_chunk_size(sec_per_file)
-
- #pbar.update(n=limit-remaining)
- pbar.update(n=chunk_limit)
-
- if print_report:
- num_chunks = len(report)
- total_ins = sum(map(lambda c:c['chunk_size'], report))
- avg_chunk = total_ins / num_chunks
- total_time = sum(map(lambda c:c['chunk_time_used'], report))
- avg_time = total_time / num_chunks
- time_match = avg_time / max(chunk_time*100, 1)
-
- print( 'Sync report: ')
- print(f' Total chunks : {num_chunks} ')
- print(f' Total file inserts : {total_ins} / {limit} ')
- print(f' Average chunk size : {avg_chunk:.2f} ')
- print(f' Total time spent : {total_time:.2f}s ')
- print(f' Average time / chunk : {avg_time:.2f}s / {chunk_time}s')
- print(f' Percent time match : {time_match:.2f}% ')
-
- if dry: return dry_inserts
-
- def migrate(self):
- # create MD object representation current DB state
- pre_metadata = sa.MetaData()
- pre_metadata.reflect(bind=self.engine)
-
- table_rows = {}
- for table_name, table in pre_metadata.tables.items():
- # pre-migration rows
- select_results = utils.db.sa_execute(self.engine, sa.select(table))
- named_results = utils.db.named_results(table, select_results)
-
- table_rows[table_name] = named_results
-
-
- #table_update_map = {
- # 'files': (
- # lambda r: File(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
- # ),
- # 'notes': (
- # lambda r: Note(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
- # ),
- # 'note_conversion_matter': (
- # lambda r: Note(Path(NOTEPATH, r['name']), NOTEPATH).get_table_data()
- # ),
- #}
-
- # update schema
- self.recreate()
-
- with self.engine.connect() as connection:
- for table_name, table in tables.metadata.tables.items():
- if table_name not in table_rows: continue
-
- logger.info(f'Migrating table "{table_name}"')
-
- if table_name == 'files':
- update_note_rows = []
-
- for i,note_row in enumerate(table_rows['files']):
- file = File(Path(NOTEPATH, note_row['name']), NOTEPATH)
- u_note_row = utils.db.prepare_insert(
- tables.files,
- file.get_table_data()
- )
- note_row.update({k:v for k,v in u_note_row.items() if v})
- update_note_rows.append(note_row)
-
- table_rows['files'] = update_note_rows
- elif table_name == 'notes':
- update_note_rows = []
-
- for note_row in table_rows['notes']:
- note = Note(Path(NOTEPATH, note_row['name']), NOTEPATH)
- u_note_row = utils.db.prepare_insert(
- tables.notes,
- note.get_table_data()
- )
- note_row.update({k:v for k,v in u_note_row.items() if v})
- update_note_rows.append(note_row)
-
- table_rows['notes'] = update_note_rows
-
- # generic update blueprint for later; now doesn't matter as conversion based
- # inserts require re-converting...so you may as well just do a full re-build.
- #update_rows = []
- #if table_name in table_update_map:
- # for i,row in enumerate(table_rows['files']):
- # update_obj = table_update_map[table_name](row)
-
- # u_row = utils.db.prepare_insert(
- # table,
- # note.get_table_data()
- # )
- # note_row.update({k:v for k,v in u_note_row.items() if v})
- # update_note_rows.append(note_row)
-
- connection.execute(
- sa.insert(table),
- table_rows[table_name]
- )
-
- connection.commit()
-
diff --git a/co3/managers/vss.py b/co3/managers/vss.py
index 4727cf8..16b03fc 100644
--- a/co3/managers/vss.py
+++ b/co3/managers/vss.py
@@ -1,142 +1,16 @@
-from collections import defaultdict
import logging
-import pickle
-import time
-from pathlib import Path
-
-import numpy as np
-from tqdm import tqdm
-import sqlalchemy as sa
from co3.manager import Manager
-from co3 import util
logger = logging.getLogger(__name__)
class VSSManager(Manager):
- def __init__(self, database, engine=None):
- super().__init__(database, engine)
+ def recreate(self): pass
- from localsys.db.databases.core import CoreDatabase
-
- self.core_db = CoreDatabase()
-
- def recreate(self):
- start = time.time()
-
- chunks = []
- chunk_ids = []
- block_map = {}
- for note_name, note_groupby in self.core_db.blocks().items():
- blocks = note_groupby.get('aggregates', [])
- for block in blocks:
- if block.get('format') != 'src': continue
-
- block_name = block.get('name', '')
- block_content = block.get('content', '')
- block_map[block_name] = block_content
-
- block_chunks, chunk_ranges = utils.embed.split_block(block_content)
-
- chunks.extend(block_chunks)
- chunk_ids.extend([f'{block_name}@{r[0]}:{r[1]}' for r in chunk_ranges])
-
- logger.info(f'VSS pre-index SELECT took {time.time()-start:.2f}s')
- start = time.time()
-
- # one-time batched embedding
- chunk_embeddings = self.database.access.embed_chunks(chunks)
-
- logger.info(f'VSS embeddings took {time.time()-start:.2f}s')
- start = time.time()
-
- # aggregate embeddings up the content hierarchy
- zero_embed = lambda: np.atleast_2d(
- np.zeros(self.database.access._embedding_size, dtype='float32')
- )
- block_embedding_map = defaultdict(zero_embed)
- note_embedding_map = defaultdict(zero_embed)
-
- for i, chunk_id in enumerate(chunk_ids):
- at_split = chunk_id.split('@')
- note_name = at_split[0]
- block_name = '@'.join(at_split[:3])
-
- block_embedding_map[block_name] += chunk_embeddings[i]
- note_embedding_map[note_name] += chunk_embeddings[i]
-
- # re-normalize aggregated embeddings
- # > little inefficient but gobbled up asymptotically by chunk processing
- # > could o/w first convert each map to a full 2D np array, then back
- for embed_map in [block_embedding_map, note_embedding_map]:
- for k, agg_embedding in embed_map.items():
- embed_map[k] = utils.embed.unit_row_wise(agg_embedding)
-
- logger.info(f'VSS recreate: pre-index SELECT took {time.time()-start:.2f}s')
-
- # cols = ['rowid', 'chunk_embedding']
- # def pack_inserts(embeddings):
- # return [
- # { cols[0]: i, cols[1]: embedding.tobytes() }
- # for i, embedding in enumerate(embeddings)
- # ]
-
- # chunk_inserts = pack_inserts(chunk_embeddings)
- # block_inserts = pack_inserts(block_embedding_map.values())
- # note_inserts = pack_inserts(note_embedding_map.values())
-
- chunk_ids = chunk_ids
- block_ids = list(block_embedding_map.keys())
- note_ids = list(note_embedding_map.keys())
-
- block_embeddings = np.vstack(list(block_embedding_map.values()))
- note_embeddings = np.vstack(list(note_embedding_map.values()))
-
- blocks = [block_map[b] for b in block_ids]
- notes = note_ids
-
- self.database.access.write_embeddings({
- 'chunks': (chunk_ids, chunk_embeddings, chunks),
- 'blocks': (block_ids, block_embeddings, blocks),
- 'notes' : (note_ids, note_embeddings, notes),
- })
-
- logger.info(f'Post-embedding parsing took {time.time()-start:.2f}s')
-
- # utils.db.create_vss0(
- # self.engine,
- # 'chunks',
- # columns=list(cols),
- # inserts=chunk_inserts,
- # populate=True,
- # reset=True,
- # embedding_size=self._embedding_size,
- # )
-
- def update(self): pass
+ def insert(self): pass
def sync(self): pass
def migrate(self): pass
- def _archive_note_embed(self):
- for block in blocks:
- content = block.get('content')
- if not content: continue
-
- block_chunks = utils.embed.split_block(content)
- chunks.extend(block_chunks)
-
- embeddings = utils.embed.embed_chunks(chunks)
- block_embeddings = utils.embed.map_groups(embeddings, embed_block_map)
-
- self.convert_map['vect'] = block_embeddings
-
- note_embedding = utils.embed.unit_row_wise(
- embeddings.sum(axis=0).reshape(1, -1)
- )
-
- return {
- 'content': note_embedding.tobytes(),
- }
diff --git a/co3/mapper.py b/co3/mapper.py
index 839a501..1c96f2e 100644
--- a/co3/mapper.py
+++ b/co3/mapper.py
@@ -59,6 +59,7 @@ class Mapper[C: Component]:
inserts (hence why we tie Mappers to Schemas one-to-one).
.. admonition:: Dev note
+
the Composer needs reconsideration, or at least its positioning directly in this
class. It may be more appropriate to have at the Schema level, or even just
dissolved altogether if arbitrary named Components can be attached to schemas.