From 0f9582c39199402300a9f884b47b6cc858391a85 Mon Sep 17 00:00:00 2001 From: "Sam G." Date: Sat, 13 Apr 2024 02:14:40 -0700 Subject: [PATCH] clean up Mapper/CO3 conneciton, add Engine abstraction --- co3/accessor.py | 20 +++++- co3/accessors/sql.py | 72 +++++++++++++++++----- co3/collector.py | 4 +- co3/composer.py | 2 +- co3/database.py | 131 ++++++++++++++++++++++++++++++---------- co3/databases/sql.py | 20 +++--- co3/engine.py | 80 ++++++++++++++++++++++++ co3/engines/__init__.py | 51 ++++++++++++++++ co3/manager.py | 21 +++---- co3/mapper.py | 6 +- co3/util/db.py | 82 ------------------------- 11 files changed, 330 insertions(+), 159 deletions(-) create mode 100644 co3/engine.py create mode 100644 co3/engines/__init__.py diff --git a/co3/accessor.py b/co3/accessor.py index c4b98ad..cf08ab2 100644 --- a/co3/accessor.py +++ b/co3/accessor.py @@ -15,7 +15,7 @@ import sqlalchemy as sa from co3.component import Component -class Accessor[C: Component, D: 'Database[C]'](metaclass=ABCMeta): +class Accessor[C: Component](metaclass=ABCMeta): ''' Access wrapper class for complex queries and easy integration with Composer tables. Implements high-level access to things like common constrained SELECT queries. @@ -24,6 +24,20 @@ class Accessor[C: Component, D: 'Database[C]'](metaclass=ABCMeta): engine: SQLAlchemy engine to use for queries. Engine is initialized dynamically as a property (based on the config) if not provided ''' - def __init__(self, database: D): - self.database = database + @abstractmethod + def raw_select( + self, + connection, + text: str, + ): + raise NotImplementedError + @abstractmethod + def select( + self, + connection, + component: C, + *args, + **kwargs + ): + raise NotImplementedError diff --git a/co3/accessors/sql.py b/co3/accessors/sql.py index 41ce839..9f824eb 100644 --- a/co3/accessors/sql.py +++ b/co3/accessors/sql.py @@ -45,34 +45,45 @@ from functools import cache import sqlalchemy as sa from co3 import util +from co3.engines import SQLEngine from co3.accessor import Accessor from co3.components import Relation, SQLTable -class RelationalAccessor[R: Relation, D: 'RelationalDatabase[R]'](Accessor[R, D]): - def raw_select(self, sql: str): +class RelationalAccessor[R: Relation](Accessor[R]): + def raw_select( + self, + connection, + text: str + ): + connection.exec raise NotImplementedError def select( self, + connection, relation: R, - cols = None, + attributes = None, where = None, distinct_on = None, order_by = None, limit = 0, + mappings : bool = False, + include_cols : bool = False, ): raise NotImplementedError def select_one( self, + connection, relation : R, - cols = None, + attributes = None, where = None, mappings : bool = False, include_cols : bool = False, ): - res = self.select(relation, cols, where, mappings, include_cols, limit=1) + res = self.select( + relation, attributes, where, mappings, include_cols, limit=1) if include_cols and len(res[0]) > 0: return res[0][0], res[1] @@ -83,7 +94,7 @@ class RelationalAccessor[R: Relation, D: 'RelationalDatabase[R]'](Accessor[R, D] return None -class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']): +class SQLAccessor(RelationalAccessor[SQLTable]): def raw_select( self, sql, @@ -99,15 +110,15 @@ class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']): def select( self, - table: SQLTable, - cols = None, + table: SQLTable, + columns = None, where = None, distinct_on = None, order_by = None, limit = 0, mappings = False, include_cols = False, - ): + ) -> list[dict|sa.Mapping]: ''' Perform a SELECT query against the provided table-like object (see `check_table()`). @@ -122,14 +133,14 @@ class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']): (no aggregation methods accepted) order_by: column to order results by (can use .desc() to order by descending) + + Returns: + Statement results, either as a list of 1) SQLAlchemy Mappings, or 2) converted + dictionaries ''' if where is None: where = sa.true() - res_method = utils.db.sa_exec_dicts - if mappings: - res_method = utils.db.sa_exec_mappings - stmt = sa.select(table).where(where) if cols is not None: stmt = sa.select(*cols).select_from(table).where(where) @@ -143,4 +154,37 @@ class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']): if limit > 0: stmt = stmt.limit(limit) - return res_method(self.engine, stmt, include_cols=include_cols) + res = SQLEngine._execute(connection, statement, include_cols=include_cols) + + if mappings: + return res.mappings().all() + else: + return self.result_dicts(res) + + @staticmethod + def result_dicts(results, query_cols=None): + ''' + Parse SQLAlchemy results into Python dicts. Leverages mappings to associate full + column name context. + + If `query_cols` is provided, their implicit names will be used for the keys of the + returned dictionaries. This information is not available under CursorResults and thus + must be provided separately. This will yield results like the following: + + [..., {'table1.col':, 'table2.col':, ...}, ...] + + Instead of the automatic mapping names: + + [..., {'col':, 'col_1':, ...}, ...] + + which can make accessing certain results a little more intuitive. + ''' + result_mappings = results.mappings().all() + + if query_cols: + return [ + { str(c):r[c] for c in query_cols } + for r in result_mappings + ] + + return [dict(r) for r in result_mappings] diff --git a/co3/collector.py b/co3/collector.py index 8ea228b..1d473e1 100644 --- a/co3/collector.py +++ b/co3/collector.py @@ -22,8 +22,6 @@ Note: from pathlib import Path from collections import defaultdict import logging -import importlib -import subprocess from uuid import uuid4 import sqlalchemy as sa @@ -35,7 +33,7 @@ from co3.component import Component logger = logging.getLogger(__name__) -class Collector[C: Component, M: 'Mapper[C]']: +class Collector[C: Component]: def __init__(self, schema: Schema[C]): self.schema = schema diff --git a/co3/composer.py b/co3/composer.py index 4456cb5..2e476b8 100644 --- a/co3/composer.py +++ b/co3/composer.py @@ -43,7 +43,7 @@ def register_table(table_name=None): return func return decorator -class Composer[C: Component, M: 'Mapper[C]']: +class Composer[C: Component]: ''' Base composer wrapper for table groupings. diff --git a/co3/database.py b/co3/database.py index 7952296..2026d6f 100644 --- a/co3/database.py +++ b/co3/database.py @@ -7,26 +7,100 @@ objects. The Database type hierarchy attempts to be exceedingly general; SQL-derivatives should subclass from the RelationalDatabase subtype, for example, which itself becomes a new -generic via type dependence on Relation. +generic via a type dependence on Relation. + +While relying no many constituent pieces, Databases intend to provide all needed objects +under one roof. This includes the Engine (opens up connections to the database), Accessors +(running select-like queries on DB data), Managers (updating DB state with sync +insert-like actions), and Indexers (systematically caching Accessor queries). Generalized +behavior is supported by explicitly leveraging the individual components. For example, + +``` +with db.engine.connect() as connection: + db.access.select( + connection, + + ) + db.manager.insert( + connection, + component, + data + ) +``` + +The Database also supports a few directly callable methods for simplified interaction. +These methods manage a connection context internally, passing them through the way they +might otherwise be handled explicitly, as seen above. + +``` +db.select() + +db.insert(, data) +``` + +Dev note: on explicit connection contexts + Older models supported Accessors/Managers that housed their own Engine instances, and + when performing actions like `insert`, the Engine would be passed all the way through + until a Connection could be spawned, and in that context the single action would be + made. This model forfeits a lot of connection control, preventing multiple actions + under a single connection. + + The newer model now avoids directly allowing Managers/Accessors access to their own + engines, and instead they expose methods that explicitly require Connection objects. + This means a user can invoke these methods in their own Connection contexts (seen + above) and group up operations as they please, reducing overhead. The Database then + wraps up a few single-operation contexts where outer connection control is not needed. ''' import logging -from typing import Self from co3.accessor import Accessor from co3.composer import Composer from co3.manager import Manager from co3.indexer import Indexer +from co3.engine import Engine logger = logging.getLogger(__name__) class Database[C: Component]: - accessor: type[Accessor[C, Self]] = Accessor[C, Self] - manager: type[Manager[C, Self]] = Manager[C, Self] + ''' + Generic Database definition - def __init__(self, resource): + Generic to both a Component (C), and an Engine resource type (R). The Engine's + generic openness must be propagated here, as it's intended to be fully abstracted away + under the Database roof. Note that we cannot explicitly use an Engine type in its + place, as it obscures its internal resource type dependence when we need it for + hinting here in `__init__`. + + Development TODO list: + - Decide on official ruling for assigning Schema objects, and verifying any + attempted Component-based actions (e.g., inserts, selects) to belong to or be a + composition of Components within an attached Schema. Reasons for: helps complete + the sense of a "Database" here programmatically, incorporating a more + structurally accurate representation of allowed operations, and prevent possible + attribute and type collisions. Reasons against: generally not a huge concern to + align Schemas as transactions will rollback, broadly increases a bit of bulk, + and users often expected know which components belong to a particular DB. + Leaning more to **for**, and would only apply to the directly supported method + passthroughs (and thus would have no impact on independent methods like + `Accessor.raw_select`). Additionally, even if component clashes don't pose + serious risk, it can be helpful to systematically address the cases where a + misalignment is occurring (by having helpful `verify` methods that can be ran + before any actions). + ''' + _accessor_cls: type[Accessor[C]] = Accessor[C] + _manager_cls: type[Manager[C]] = Manager[C] + _engine_cls: type[Engine] = Engine + + def __init__(self, *engine_args, **engine_kwargs): ''' + Parameters: + engine_args: positional arguments to pass on to the Engine object during + instantiation + engine_kwargs: keyword arguments to pass on to the Engine object during + instantiation + Variables: _local_cache: a database-local property store for ad-hoc CacheBlock-esque methods, that are nevertheless _not_ query/group-by responses to @@ -34,37 +108,32 @@ class Database[C: Component]: this cache and check for existence of stored results; the cache state must be managed globally. ''' - self.resource = resource + self.engine = self._engine_cls(*engine_args, **engine_kwargs) - self._access = self.accessor(self) - self._manage = self.manager(self) + self.accessor = self._accessor_cls() + self.manager = self._manager_cls() + self.indexer = Indexer(self.accessor) - self._index = Indexer(self._access) self._local_cache = {} + self._reset_cache = False - self.reset_cache = False + def select(self, component: C, *args, **kwargs): + with self.engine.connect() as connection: + return self.accessor.select( + connection, + component, + *args, + **kwargs + ) - @property - def engine(self): - ''' - Database property to provide a singleton engine for DB interaction, initializing - the database if it doesn't already exist. - - TODO: figure out thread safety across engines and/or connection. Any issue with - hanging on to the same engine instance for the Database instance? - ''' - raise NotImplementedError - - def connect(self): - self.engine.connect() - - @property - def access(self): - return self._access - - @property - def compose(self): - return self._compose + def insert(self, component: C, *args, **kwargs): + with self.engine.connect() as connection: + return self.accessor.insert( + connection, + component, + *args, + **kwargs + ) @property def index(self): diff --git a/co3/databases/sql.py b/co3/databases/sql.py index 56a3f77..fe07c3d 100644 --- a/co3/databases/sql.py +++ b/co3/databases/sql.py @@ -1,27 +1,27 @@ -from typing import Self - -from co3.database import Database +from co3.database import Database, Engine from co3.accessors.sql import RelationalAccessor, SQLAccessor from co3.managers.sql import RelationalManager, SQLManager +from co3.engines import SQLEngine from co3.components import Relation, SQLTable -class RelationalDatabase[R: Relation](Database): +class RelationalDatabase[C: RelationR](Database): ''' accessor/manager assignments satisfy supertype's type settings; - `TabluarAccessor[Self, R]` is of type `type[RelationalAccessor[Self, R]]` + `TabluarAccessor[Self, C]` is of type `type[RelationalAccessor[Self, C]]` (and yes, `type[]` specifies that the variable is itself being set to a type or a class, rather than a satisfying _instance_) ''' - accessor: type[RelationalAccessor[Self, R]] = RelationalAccessor[Self, R] - manager: type[RelationalManager[Self, R]] = RelationalManager[Self, R] + _accessor_cls: type[RelationalAccessor[C]] = RelationalAccessor[C] + _manager_cls: type[RelationalManager[C]] = RelationalManager[C] -class SQLDatabase[R: SQLTable](RelationalDatabase[R]): - accessor = SQLAccessor - manager = SQLManager +class SQLDatabase[C: SQLTable](RelationalDatabase[C]): + _accessor_cls = SQLAccessor + _manager_cls = SQLManager + _engine_cls = SQLEngine class SQLiteDatabase(SQLDatabase[SQLTable]): diff --git a/co3/engine.py b/co3/engine.py new file mode 100644 index 0000000..5e4cb5e --- /dev/null +++ b/co3/engine.py @@ -0,0 +1,80 @@ +import logging +from contextlib import contextmanager + + +logger = logging.getLogger(__name__) + +class Engine: + ''' + Engine base class. Used primarily as a Database connection manager, with general + methods that can be defined for several kinds of value stores. + + Note that this is where the connection hierarchy is supposed to stop. While some + derivative Engines, like SQLEngine, mostly just wrap another engine-like object, this + is not the rule. That is, inheriting Engine subtypes shouldn't necessarily expect to + rely on another object per se, and if such an object is required, _this_ is the class + is meant to be skeleton to supports its creation (and not merely a wrapper for some + other type, although it may appear that way when such a type is in fact readily + available). + + Dev note: why is this object necessary? + More specifically, why not just have all the functionality here packed into the + Database by default? The answer is that, realistically, it could be. The type + separation between the Engine and Database is perhaps the least substantiated in + CO3. That being said, it still serves a purpose: to make composition of subtypes + easier. The Engine is a very lightweight abstraction, but some Engine subtypes + (e.g., FileEngines) may be used across several sibling Database types. In this + case, we'd have to repeat the Engine-related functionality for such sibling types. + Depending instead on a singular, outside object keeps things DRY. If Databases and + Engines were uniquely attached type-wise 1-to-1 (i.e., unique Engine type per + unique Database type), a separate object here would indeed be a waste, as is the + case for any compositional typing scheme. + + Dev note: + This class is now non-generic. It was originally conceived as a generic, depending + on a "resource spec type" to be help define expected types on initialization. + This simply proved too messy, required generic type propagation to the Database + definition, and muddied the otherwise simple args and kwargs forwarding for + internal manager creation. + ''' + def __init__(self, *manager_args, **manager_kwargs): + self._manager = None + self._manager_args = manager_args + self._manager_kwargs = manager_kwargs + + @property + def manager(self): + ''' + Return Engine's singleton manager, initializing when the first call is made. + ''' + if self._manager is None: + self._manager = self._create_manager() + + return self._manager + + def _create_manager(self): + ''' + Create the session manager needed for connection contexts. This method is called + once by the `.manager` property function when it is first accessed. This method is + separated to isolate the creation logic in inheriting types. + + Note that this method takes no explicit arguments. This is primarily because the + standard means of invocation (the manager property) is meant to remain generally + useful here in the base class, and can't be aware of any specific properties that + might be extracted in subtype initialization. As a result, we don't even try to + pass args, although it would just look like a forwarding of the readily manager + args and kwargs anyhow. As such, this method should make direct use of these + instance variables as needed. + ''' + raise NotImplementedError + + @contextmanager + def connect(self, timeout=None): + ''' + Open a connection to the database specified by the resource. Exactly what the + returned connection looks like remains relatively unconstrained given the wide + variety of possible database interactions. This function should be invoked in + with-statement contexts, constituting an "interaction session" with the database + (i.e., allowing several actions to be performed using the same connection). + ''' + raise NotImplementedError diff --git a/co3/engines/__init__.py b/co3/engines/__init__.py new file mode 100644 index 0000000..81d810e --- /dev/null +++ b/co3/engines/__init__.py @@ -0,0 +1,51 @@ +import sqlalchemy as sa + +from co3.engine import Engine + + +class SQLEngine(Engine): + def __init__(self, url: str | sa.URL, **kwargs): + super().__init__(url, **kwargs) + + def _create_manager(self): + return sa.create_engine(*self.manager_args, self.manager_kwargs) + + @contextmanager + def connect(self, timeout=None): + return self.manager.connect() + + @staticmethod + def _execute( + connection, + statement, + bind_params=None, + include_cols=False, + ): + ''' + Execute a general SQLAlchemy statement, optionally binding provided parameters and + returning associated column names. + + Parameters: + connection: database connection instance + statement: SQLAlchemy statement + bind_params: + include_cols: whether to return + ''' + res = connection.execute(statement, bind_params) + + if include_cols: + cols = list(res.mappings().keys()) + return res, cols + + return res + + @staticmethod + def _exec_explicit(connection, statement, bind_params=None): + trans = connection.begin() # start a new transaction explicitly + try: + result = connection.execute(statement, bind_params) + trans.commit() # commit the transaction explicitly + return result + except: + trans.rollback() # rollback the transaction explicitly + raise diff --git a/co3/manager.py b/co3/manager.py index e69270b..04a6ec0 100644 --- a/co3/manager.py +++ b/co3/manager.py @@ -8,10 +8,9 @@ from pathlib import Path from abc import ABCMeta, abstractmethod from co3.schema import Schema -#from co3.database import Database -class Manager[C: Component, D: 'Database[C]'](metaclass=ABCMeta): +class Manager[C: Component](metaclass=ABCMeta): ''' Management wrapper class for table groupings. @@ -19,21 +18,19 @@ class Manager[C: Component, D: 'Database[C]'](metaclass=ABCMeta): most operations need to be coordinated across tables. A few common operations are wrapped up in this class to then also be mirrored for the FTS counterparts. ''' - def __init__(self, database: D): - self.database = database - @abstractmethod def recreate(self, schema: Schema[C]): raise NotImplementedError + @abstractmethod + def insert(self, component: C, *args, **kwargs): + raise NotImplementedError + + @abstractmethod + def sync(self): + raise NotImplementedError + @abstractmethod def migrate(self): raise NotImplementedError - @abstractmethod - def insert(self): - raise NotImplementedError - - @abstractmethod - def sync(self): - raise NotImplementedError diff --git a/co3/mapper.py b/co3/mapper.py index e4ff2b9..9cb3d18 100644 --- a/co3/mapper.py +++ b/co3/mapper.py @@ -29,7 +29,7 @@ Development log: hierarchy). As such, to fully collect from a type, the Mapper needs to leave registration open to various types, not just those part of the same hierarchy. ''' -from typing import Callable, Self +from typing import Callable from collections import defaultdict from co3.co3 import CO3 @@ -59,8 +59,8 @@ class Mapper[C: Component]: this class. It may be more appropriate to have at the Schema level, or even just dissolved altogether if arbitrary named Components can be attached to schemas. ''' - _collector_cls: type[Collector[C, Self]] = Collector[C, Self] - _composer_cls: type[Composer[C, Self]] = Composer[C, Self] + _collector_cls: type[Collector[C]] = Collector[C] + _composer_cls: type[Composer[C]] = Composer[C] def __init__(self, schema: Schema[C]): ''' diff --git a/co3/util/db.py b/co3/util/db.py index 952e42e..3e8c961 100644 --- a/co3/util/db.py +++ b/co3/util/db.py @@ -44,87 +44,6 @@ def named_results(table, results): for r in results ] -# RAW CURSOR-RESULT MANIPULATION -- SEE SA-PREFIXED METHODS FOR CONN-WRAPPED COUNTERPARTS -def result_mappings(results): - return results.mappings() - -def result_mappings_all(results): - return result_mappings(results).all() - -def result_dicts(results, query_cols=None): - ''' - Parse SQLAlchemy results into Python dicts. Leverages mappings to associate full - column name context. - - If `query_cols` is provided, their implicit names will be used for the keys of the - returned dictionaries. This information is not available under CursorResults and thus - must be provided separately. This will yield results like the following: - - [..., {'table1.col':, 'table2.col':, ...}, ...] - - Instead of the automatic mapping names: - - [..., {'col':, 'col_1':, ...}, ...] - - which can make accessing certain results a little more intuitive. - ''' - if query_cols: - return [ - { str(c):r[c] for c in query_cols } - for r in result_mappings_all(results) - ] - - return [dict(r) for r in result_mappings_all(results)] - -def sa_execute(engine, stmt, bind_params=None, include_cols=False): - ''' - Simple single-statement execution in a with-block - ''' - with engine.connect() as conn: - res = conn.execute(stmt, bind_params) - - if include_cols: - cols = list(res.mappings().keys()) - return res, cols - - return res - -def sa_exec_mappings(engine, stmt, bind_params=None, include_cols=False): - ''' - All mappings fetched inside of connect context, safe to access outside - ''' - with engine.connect() as conn: - res = conn.execute(stmt, bind_params) - mappings = result_mappings_all(res) - - if include_cols: - cols = list(res.mappings().keys()) - return mappings, cols - - return mappings - -def sa_exec_dicts(engine, stmt, bind_params=None, include_cols=False): - with engine.connect() as conn: - res = conn.execute(stmt, bind_params) - dicts = result_dicts(res) - - if include_cols: - cols = list(res.mappings().keys()) - return dicts, cols - - return dicts - -def sa_exec_explicit(engine, stmt, bind_params=None): - with engine.connect() as conn: - trans = conn.begin() # start a new transaction explicitly - try: - result = conn.execute(stmt, bind_params) - trans.commit() # commit the transaction explicitly - return result - except: - trans.rollback() # rollback the transaction explicitly - raise - def deferred_fkey(target, **kwargs): return sa.ForeignKey( target, @@ -140,7 +59,6 @@ def deferred_cd_fkey(target, **kwargs): ''' return deferred_fkey(target, ondelete='CASCADE', **kwargs) - def get_column_names_str_table(engine, table: str): col_sql = f'PRAGMA table_info({table});' with engine.connect() as connection: