Replies: 3 comments 2 replies
-
Beta Was this translation helpful? Give feedback.
0 replies
-
I couldn't figure out a solution that didn't fall apart and moved to fastapi. See #1920 |
Beta Was this translation helpful? Give feedback.
2 replies
-
A Django-like approach: wrap everything into a global transaction per app's lifecycle. Haven't tested on any complicated code yet, but at least it looks like a good approach. from contextlib import asynccontextmanager
from typing import AsyncGenerator, cast
from litestar import Litestar
from litestar.datastructures import State
from litestar.plugins.sqlalchemy import SQLAlchemyAsyncConfig
from litestar.types import Scope
from sqlalchemy import event
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
class SQLAlchemyAsyncConfigTransactional(SQLAlchemyAsyncConfig):
"""
Wraps app's lifecycle into one transaction.
So multiple commits/rollbacks per single test are possible.
- One engine connection plus one outer transaction for the app lifespan (per test)
- Sessionmaker bound to that connection
- Each provided session runs inside a SAVEPOINT that is recreated after each commit/rollback
"""
@asynccontextmanager
async def lifespan(self, app: Litestar) -> AsyncGenerator[None, None]:
deps = self.create_app_state_items()
app.state.update(deps)
engine = cast(AsyncEngine, app.state[self.engine_app_state_key])
conn = await engine.connect()
outer_tx = await conn.begin()
# Replace the sessionmaker with one bound to this single connection
session_maker = async_sessionmaker(bind=conn,
expire_on_commit=getattr(self.session_config, "expire_on_commit", False),
class_=AsyncSession, )
app.state[self.session_maker_app_state_key] = session_maker
try:
if self.create_all:
await self.create_all_metadata(app)
yield
finally:
# Roll back everything done during the lifespan (i.e., per test)
if outer_tx.is_active:
await outer_tx.rollback()
await conn.close()
# Dispose engine created by base config
eng = app.state.get(self.engine_app_state_key)
if eng is not None and hasattr(eng, "dispose"):
await cast(AsyncEngine, eng).dispose()
def provide_session(self, state: State, scope: Scope) -> AsyncSession:
"""
Create a session instance that would be rolled back on app's shutdown.
"""
session = super().provide_session(state, scope)
# Expected only 1 session per lifecycle
# in case anyone called this method more than once
if "_savepoint_listener_installed" not in session.info:
session.sync_session.begin_nested()
@event.listens_for(session.sync_session, "after_transaction_end")
def _restart_savepoint(sess, trans):
if trans.nested and not trans._parent.is_active:
sess.begin_nested()
session.info["_savepoint_listener_installed"] = True
return session Use like this
P.S. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
I'm using the built-in litestar plugin for SQLAlchemy and I'm trying to figure out how to make it use the same connection as the one I created in tests. The goal is to run a rollback on the connection after each test to avoid side effects. In my current implementation, I get the following error:
My guess is that litestar runs in a different event loop from the one that is used by pytest-asyncio but I don't see a way to pass an event loop into the app. Did anyone already solve the problem? Do you have rollbacks in your tests?
This is the full code of the test fixtures:
Beta Was this translation helpful? Give feedback.
All reactions