clean up Mapper/CO3 conneciton, add Engine abstraction

This commit is contained in:
Sam G. 2024-04-13 02:14:40 -07:00
parent 753b67a51f
commit f5a0f21e64
11 changed files with 330 additions and 159 deletions

View File

@ -15,7 +15,7 @@ import sqlalchemy as sa
from co3.component import Component
class Accessor[C: Component, D: 'Database[C]'](metaclass=ABCMeta):
class Accessor[C: Component](metaclass=ABCMeta):
'''
Access wrapper class for complex queries and easy integration with Composer tables.
Implements high-level access to things like common constrained SELECT queries.
@ -24,6 +24,20 @@ class Accessor[C: Component, D: 'Database[C]'](metaclass=ABCMeta):
engine: SQLAlchemy engine to use for queries. Engine is initialized dynamically as
a property (based on the config) if not provided
'''
def __init__(self, database: D):
self.database = database
@abstractmethod
def raw_select(
self,
connection,
text: str,
):
raise NotImplementedError
@abstractmethod
def select(
self,
connection,
component: C,
*args,
**kwargs
):
raise NotImplementedError

View File

@ -45,34 +45,45 @@ from functools import cache
import sqlalchemy as sa
from co3 import util
from co3.engines import SQLEngine
from co3.accessor import Accessor
from co3.components import Relation, SQLTable
class RelationalAccessor[R: Relation, D: 'RelationalDatabase[R]'](Accessor[R, D]):
def raw_select(self, sql: str):
class RelationalAccessor[R: Relation](Accessor[R]):
def raw_select(
self,
connection,
text: str
):
connection.exec
raise NotImplementedError
def select(
self,
connection,
relation: R,
cols = None,
attributes = None,
where = None,
distinct_on = None,
order_by = None,
limit = 0,
mappings : bool = False,
include_cols : bool = False,
):
raise NotImplementedError
def select_one(
self,
connection,
relation : R,
cols = None,
attributes = None,
where = None,
mappings : bool = False,
include_cols : bool = False,
):
res = self.select(relation, cols, where, mappings, include_cols, limit=1)
res = self.select(
relation, attributes, where, mappings, include_cols, limit=1)
if include_cols and len(res[0]) > 0:
return res[0][0], res[1]
@ -83,7 +94,7 @@ class RelationalAccessor[R: Relation, D: 'RelationalDatabase[R]'](Accessor[R, D]
return None
class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']):
class SQLAccessor(RelationalAccessor[SQLTable]):
def raw_select(
self,
sql,
@ -99,15 +110,15 @@ class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']):
def select(
self,
table: SQLTable,
cols = None,
table: SQLTable,
columns = None,
where = None,
distinct_on = None,
order_by = None,
limit = 0,
mappings = False,
include_cols = False,
):
) -> list[dict|sa.Mapping]:
'''
Perform a SELECT query against the provided table-like object (see
`check_table()`).
@ -122,14 +133,14 @@ class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']):
(no aggregation methods accepted)
order_by: column to order results by (can use <col>.desc() to order
by descending)
Returns:
Statement results, either as a list of 1) SQLAlchemy Mappings, or 2) converted
dictionaries
'''
if where is None:
where = sa.true()
res_method = utils.db.sa_exec_dicts
if mappings:
res_method = utils.db.sa_exec_mappings
stmt = sa.select(table).where(where)
if cols is not None:
stmt = sa.select(*cols).select_from(table).where(where)
@ -143,4 +154,37 @@ class SQLAccessor(RelationalAccessor[SQLTable, 'SQLDatabase[SQLTable]']):
if limit > 0:
stmt = stmt.limit(limit)
return res_method(self.engine, stmt, include_cols=include_cols)
res = SQLEngine._execute(connection, statement, include_cols=include_cols)
if mappings:
return res.mappings().all()
else:
return self.result_dicts(res)
@staticmethod
def result_dicts(results, query_cols=None):
'''
Parse SQLAlchemy results into Python dicts. Leverages mappings to associate full
column name context.
If `query_cols` is provided, their implicit names will be used for the keys of the
returned dictionaries. This information is not available under CursorResults and thus
must be provided separately. This will yield results like the following:
[..., {'table1.col':<value>, 'table2.col':<value>, ...}, ...]
Instead of the automatic mapping names:
[..., {'col':<value>, 'col_1':<value>, ...}, ...]
which can make accessing certain results a little more intuitive.
'''
result_mappings = results.mappings().all()
if query_cols:
return [
{ str(c):r[c] for c in query_cols }
for r in result_mappings
]
return [dict(r) for r in result_mappings]

View File

@ -22,8 +22,6 @@ Note:
from pathlib import Path
from collections import defaultdict
import logging
import importlib
import subprocess
from uuid import uuid4
import sqlalchemy as sa
@ -35,7 +33,7 @@ from co3.component import Component
logger = logging.getLogger(__name__)
class Collector[C: Component, M: 'Mapper[C]']:
class Collector[C: Component]:
def __init__(self, schema: Schema[C]):
self.schema = schema

View File

@ -43,7 +43,7 @@ def register_table(table_name=None):
return func
return decorator
class Composer[C: Component, M: 'Mapper[C]']:
class Composer[C: Component]:
'''
Base composer wrapper for table groupings.

View File

@ -7,26 +7,100 @@ objects.
The Database type hierarchy attempts to be exceedingly general; SQL-derivatives should
subclass from the RelationalDatabase subtype, for example, which itself becomes a new
generic via type dependence on Relation.
generic via a type dependence on Relation.
While relying no many constituent pieces, Databases intend to provide all needed objects
under one roof. This includes the Engine (opens up connections to the database), Accessors
(running select-like queries on DB data), Managers (updating DB state with sync
insert-like actions), and Indexers (systematically caching Accessor queries). Generalized
behavior is supported by explicitly leveraging the individual components. For example,
```
with db.engine.connect() as connection:
db.access.select(
connection,
<query>
)
db.manager.insert(
connection,
component,
data
)
```
The Database also supports a few directly callable methods for simplified interaction.
These methods manage a connection context internally, passing them through the way they
might otherwise be handled explicitly, as seen above.
```
db.select(<query>)
db.insert(<query>, data)
```
Dev note: on explicit connection contexts
Older models supported Accessors/Managers that housed their own Engine instances, and
when performing actions like `insert`, the Engine would be passed all the way through
until a Connection could be spawned, and in that context the single action would be
made. This model forfeits a lot of connection control, preventing multiple actions
under a single connection.
The newer model now avoids directly allowing Managers/Accessors access to their own
engines, and instead they expose methods that explicitly require Connection objects.
This means a user can invoke these methods in their own Connection contexts (seen
above) and group up operations as they please, reducing overhead. The Database then
wraps up a few single-operation contexts where outer connection control is not needed.
'''
import logging
from typing import Self
from co3.accessor import Accessor
from co3.composer import Composer
from co3.manager import Manager
from co3.indexer import Indexer
from co3.engine import Engine
logger = logging.getLogger(__name__)
class Database[C: Component]:
accessor: type[Accessor[C, Self]] = Accessor[C, Self]
manager: type[Manager[C, Self]] = Manager[C, Self]
'''
Generic Database definition
def __init__(self, resource):
Generic to both a Component (C), and an Engine resource type (R). The Engine's
generic openness must be propagated here, as it's intended to be fully abstracted away
under the Database roof. Note that we cannot explicitly use an Engine type in its
place, as it obscures its internal resource type dependence when we need it for
hinting here in `__init__`.
Development TODO list:
- Decide on official ruling for assigning Schema objects, and verifying any
attempted Component-based actions (e.g., inserts, selects) to belong to or be a
composition of Components within an attached Schema. Reasons for: helps complete
the sense of a "Database" here programmatically, incorporating a more
structurally accurate representation of allowed operations, and prevent possible
attribute and type collisions. Reasons against: generally not a huge concern to
align Schemas as transactions will rollback, broadly increases a bit of bulk,
and users often expected know which components belong to a particular DB.
Leaning more to **for**, and would only apply to the directly supported method
passthroughs (and thus would have no impact on independent methods like
`Accessor.raw_select`). Additionally, even if component clashes don't pose
serious risk, it can be helpful to systematically address the cases where a
misalignment is occurring (by having helpful `verify` methods that can be ran
before any actions).
'''
_accessor_cls: type[Accessor[C]] = Accessor[C]
_manager_cls: type[Manager[C]] = Manager[C]
_engine_cls: type[Engine] = Engine
def __init__(self, *engine_args, **engine_kwargs):
'''
Parameters:
engine_args: positional arguments to pass on to the Engine object during
instantiation
engine_kwargs: keyword arguments to pass on to the Engine object during
instantiation
Variables:
_local_cache: a database-local property store for ad-hoc CacheBlock-esque
methods, that are nevertheless _not_ query/group-by responses to
@ -34,37 +108,32 @@ class Database[C: Component]:
this cache and check for existence of stored results; the cache
state must be managed globally.
'''
self.resource = resource
self.engine = self._engine_cls(*engine_args, **engine_kwargs)
self._access = self.accessor(self)
self._manage = self.manager(self)
self.accessor = self._accessor_cls()
self.manager = self._manager_cls()
self.indexer = Indexer(self.accessor)
self._index = Indexer(self._access)
self._local_cache = {}
self._reset_cache = False
self.reset_cache = False
def select(self, component: C, *args, **kwargs):
with self.engine.connect() as connection:
return self.accessor.select(
connection,
component,
*args,
**kwargs
)
@property
def engine(self):
'''
Database property to provide a singleton engine for DB interaction, initializing
the database if it doesn't already exist.
TODO: figure out thread safety across engines and/or connection. Any issue with
hanging on to the same engine instance for the Database instance?
'''
raise NotImplementedError
def connect(self):
self.engine.connect()
@property
def access(self):
return self._access
@property
def compose(self):
return self._compose
def insert(self, component: C, *args, **kwargs):
with self.engine.connect() as connection:
return self.accessor.insert(
connection,
component,
*args,
**kwargs
)
@property
def index(self):

View File

@ -1,27 +1,27 @@
from typing import Self
from co3.database import Database
from co3.database import Database, Engine
from co3.accessors.sql import RelationalAccessor, SQLAccessor
from co3.managers.sql import RelationalManager, SQLManager
from co3.engines import SQLEngine
from co3.components import Relation, SQLTable
class RelationalDatabase[R: Relation](Database):
class RelationalDatabase[C: RelationR](Database):
'''
accessor/manager assignments satisfy supertype's type settings;
`TabluarAccessor[Self, R]` is of type `type[RelationalAccessor[Self, R]]`
`TabluarAccessor[Self, C]` is of type `type[RelationalAccessor[Self, C]]`
(and yes, `type[]` specifies that the variable is itself being set to a type or a
class, rather than a satisfying _instance_)
'''
accessor: type[RelationalAccessor[Self, R]] = RelationalAccessor[Self, R]
manager: type[RelationalManager[Self, R]] = RelationalManager[Self, R]
_accessor_cls: type[RelationalAccessor[C]] = RelationalAccessor[C]
_manager_cls: type[RelationalManager[C]] = RelationalManager[C]
class SQLDatabase[R: SQLTable](RelationalDatabase[R]):
accessor = SQLAccessor
manager = SQLManager
class SQLDatabase[C: SQLTable](RelationalDatabase[C]):
_accessor_cls = SQLAccessor
_manager_cls = SQLManager
_engine_cls = SQLEngine
class SQLiteDatabase(SQLDatabase[SQLTable]):

80
co3/engine.py Normal file
View File

@ -0,0 +1,80 @@
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
class Engine:
'''
Engine base class. Used primarily as a Database connection manager, with general
methods that can be defined for several kinds of value stores.
Note that this is where the connection hierarchy is supposed to stop. While some
derivative Engines, like SQLEngine, mostly just wrap another engine-like object, this
is not the rule. That is, inheriting Engine subtypes shouldn't necessarily expect to
rely on another object per se, and if such an object is required, _this_ is the class
is meant to be skeleton to supports its creation (and not merely a wrapper for some
other type, although it may appear that way when such a type is in fact readily
available).
Dev note: why is this object necessary?
More specifically, why not just have all the functionality here packed into the
Database by default? The answer is that, realistically, it could be. The type
separation between the Engine and Database is perhaps the least substantiated in
CO3. That being said, it still serves a purpose: to make composition of subtypes
easier. The Engine is a very lightweight abstraction, but some Engine subtypes
(e.g., FileEngines) may be used across several sibling Database types. In this
case, we'd have to repeat the Engine-related functionality for such sibling types.
Depending instead on a singular, outside object keeps things DRY. If Databases and
Engines were uniquely attached type-wise 1-to-1 (i.e., unique Engine type per
unique Database type), a separate object here would indeed be a waste, as is the
case for any compositional typing scheme.
Dev note:
This class is now non-generic. It was originally conceived as a generic, depending
on a "resource spec type" to be help define expected types on initialization.
This simply proved too messy, required generic type propagation to the Database
definition, and muddied the otherwise simple args and kwargs forwarding for
internal manager creation.
'''
def __init__(self, *manager_args, **manager_kwargs):
self._manager = None
self._manager_args = manager_args
self._manager_kwargs = manager_kwargs
@property
def manager(self):
'''
Return Engine's singleton manager, initializing when the first call is made.
'''
if self._manager is None:
self._manager = self._create_manager()
return self._manager
def _create_manager(self):
'''
Create the session manager needed for connection contexts. This method is called
once by the `.manager` property function when it is first accessed. This method is
separated to isolate the creation logic in inheriting types.
Note that this method takes no explicit arguments. This is primarily because the
standard means of invocation (the manager property) is meant to remain generally
useful here in the base class, and can't be aware of any specific properties that
might be extracted in subtype initialization. As a result, we don't even try to
pass args, although it would just look like a forwarding of the readily manager
args and kwargs anyhow. As such, this method should make direct use of these
instance variables as needed.
'''
raise NotImplementedError
@contextmanager
def connect(self, timeout=None):
'''
Open a connection to the database specified by the resource. Exactly what the
returned connection looks like remains relatively unconstrained given the wide
variety of possible database interactions. This function should be invoked in
with-statement contexts, constituting an "interaction session" with the database
(i.e., allowing several actions to be performed using the same connection).
'''
raise NotImplementedError

51
co3/engines/__init__.py Normal file
View File

@ -0,0 +1,51 @@
import sqlalchemy as sa
from co3.engine import Engine
class SQLEngine(Engine):
def __init__(self, url: str | sa.URL, **kwargs):
super().__init__(url, **kwargs)
def _create_manager(self):
return sa.create_engine(*self.manager_args, self.manager_kwargs)
@contextmanager
def connect(self, timeout=None):
return self.manager.connect()
@staticmethod
def _execute(
connection,
statement,
bind_params=None,
include_cols=False,
):
'''
Execute a general SQLAlchemy statement, optionally binding provided parameters and
returning associated column names.
Parameters:
connection: database connection instance
statement: SQLAlchemy statement
bind_params:
include_cols: whether to return
'''
res = connection.execute(statement, bind_params)
if include_cols:
cols = list(res.mappings().keys())
return res, cols
return res
@staticmethod
def _exec_explicit(connection, statement, bind_params=None):
trans = connection.begin() # start a new transaction explicitly
try:
result = connection.execute(statement, bind_params)
trans.commit() # commit the transaction explicitly
return result
except:
trans.rollback() # rollback the transaction explicitly
raise

View File

@ -8,10 +8,9 @@ from pathlib import Path
from abc import ABCMeta, abstractmethod
from co3.schema import Schema
#from co3.database import Database
class Manager[C: Component, D: 'Database[C]'](metaclass=ABCMeta):
class Manager[C: Component](metaclass=ABCMeta):
'''
Management wrapper class for table groupings.
@ -19,21 +18,19 @@ class Manager[C: Component, D: 'Database[C]'](metaclass=ABCMeta):
most operations need to be coordinated across tables. A few common operations are
wrapped up in this class to then also be mirrored for the FTS counterparts.
'''
def __init__(self, database: D):
self.database = database
@abstractmethod
def recreate(self, schema: Schema[C]):
raise NotImplementedError
@abstractmethod
def insert(self, component: C, *args, **kwargs):
raise NotImplementedError
@abstractmethod
def sync(self):
raise NotImplementedError
@abstractmethod
def migrate(self):
raise NotImplementedError
@abstractmethod
def insert(self):
raise NotImplementedError
@abstractmethod
def sync(self):
raise NotImplementedError

View File

@ -29,7 +29,7 @@ Development log:
hierarchy). As such, to fully collect from a type, the Mapper needs to leave
registration open to various types, not just those part of the same hierarchy.
'''
from typing import Callable, Self
from typing import Callable
from collections import defaultdict
from co3.co3 import CO3
@ -59,8 +59,8 @@ class Mapper[C: Component]:
this class. It may be more appropriate to have at the Schema level, or even just
dissolved altogether if arbitrary named Components can be attached to schemas.
'''
_collector_cls: type[Collector[C, Self]] = Collector[C, Self]
_composer_cls: type[Composer[C, Self]] = Composer[C, Self]
_collector_cls: type[Collector[C]] = Collector[C]
_composer_cls: type[Composer[C]] = Composer[C]
def __init__(self, schema: Schema[C]):
'''

View File

@ -44,87 +44,6 @@ def named_results(table, results):
for r in results
]
# RAW CURSOR-RESULT MANIPULATION -- SEE SA-PREFIXED METHODS FOR CONN-WRAPPED COUNTERPARTS
def result_mappings(results):
return results.mappings()
def result_mappings_all(results):
return result_mappings(results).all()
def result_dicts(results, query_cols=None):
'''
Parse SQLAlchemy results into Python dicts. Leverages mappings to associate full
column name context.
If `query_cols` is provided, their implicit names will be used for the keys of the
returned dictionaries. This information is not available under CursorResults and thus
must be provided separately. This will yield results like the following:
[..., {'table1.col':<value>, 'table2.col':<value>, ...}, ...]
Instead of the automatic mapping names:
[..., {'col':<value>, 'col_1':<value>, ...}, ...]
which can make accessing certain results a little more intuitive.
'''
if query_cols:
return [
{ str(c):r[c] for c in query_cols }
for r in result_mappings_all(results)
]
return [dict(r) for r in result_mappings_all(results)]
def sa_execute(engine, stmt, bind_params=None, include_cols=False):
'''
Simple single-statement execution in a with-block
'''
with engine.connect() as conn:
res = conn.execute(stmt, bind_params)
if include_cols:
cols = list(res.mappings().keys())
return res, cols
return res
def sa_exec_mappings(engine, stmt, bind_params=None, include_cols=False):
'''
All mappings fetched inside of connect context, safe to access outside
'''
with engine.connect() as conn:
res = conn.execute(stmt, bind_params)
mappings = result_mappings_all(res)
if include_cols:
cols = list(res.mappings().keys())
return mappings, cols
return mappings
def sa_exec_dicts(engine, stmt, bind_params=None, include_cols=False):
with engine.connect() as conn:
res = conn.execute(stmt, bind_params)
dicts = result_dicts(res)
if include_cols:
cols = list(res.mappings().keys())
return dicts, cols
return dicts
def sa_exec_explicit(engine, stmt, bind_params=None):
with engine.connect() as conn:
trans = conn.begin() # start a new transaction explicitly
try:
result = conn.execute(stmt, bind_params)
trans.commit() # commit the transaction explicitly
return result
except:
trans.rollback() # rollback the transaction explicitly
raise
def deferred_fkey(target, **kwargs):
return sa.ForeignKey(
target,
@ -140,7 +59,6 @@ def deferred_cd_fkey(target, **kwargs):
'''
return deferred_fkey(target, ondelete='CASCADE', **kwargs)
def get_column_names_str_table(engine, table: str):
col_sql = f'PRAGMA table_info({table});'
with engine.connect() as connection: