Concept: Rust database access via Python database connection pool (via runInteraction) (LLM splat v1)#19846
Conversation
| name: &'static str, | ||
| func: InteractionFn, | ||
| ) -> AnyResult; | ||
| } |
There was a problem hiding this comment.
From my high-level reading of things, the type erasure stuff here is because we want DatabasePool to by dyn-compatible because we currently store it like this:
pub struct Store {
pub config: SynapseConfig,
pub db_pool: Box<dyn DatabasePool>,
}If we drop that constraint and instead store the DatabasePool like the following, all of erasure complexity can fall away.
pub struct Store<P: DatabasePool> {
pub config: SynapseConfig,
pub db_pool: P,
}I think the answer depends on whether we care about the Store being Synapse specific or if that also should be shared with synapse-rust-apps.
I think the synapse-rust-apps might have its own Store implementation given we already store things that are Synapse specific in it like SynapseConfig. Although that could be refactored to be different 🤔
There was a problem hiding this comment.
Added d9a111b to see what this looks like
| // (3) Kick off `runInteraction` on the reactor thread. It's a | ||
| // coroutine, so we wrap it with `ensureDeferred` and attach our | ||
| // callbacks. | ||
| let database_pool_py = self.database_pool_py.clone_ref(py); | ||
| let starter = PyCFunction::new_closure( | ||
| py, | ||
| None, | ||
| None, | ||
| move |args, _kwargs| -> PyResult<Py<PyAny>> { | ||
| let py = args.py(); | ||
| let coro = database_pool_py.bind(py).call_method1( | ||
| intern!(py, "runInteraction"), | ||
| (name, callback.bind(py)), | ||
| )?; | ||
| let deferred = py | ||
| .import("twisted.internet.defer")? | ||
| .call_method1(intern!(py, "ensureDeferred"), (coro,))?; | ||
| deferred.call_method1( | ||
| intern!(py, "addCallbacks"), | ||
| (on_success.bind(py), on_error.bind(py)), | ||
| )?; | ||
| Ok(py.None()) | ||
| }, | ||
| )?; | ||
|
|
||
| self.reactor | ||
| .bind(py) | ||
| .call_method1(intern!(py, "callFromThread"), (starter,))?; |
There was a problem hiding this comment.
All of this looks suspect. We shouldn't have to callFromThread as runInteraction already handles creating a thread to work from.
| /// | ||
| /// Does not handle deferred cancellation or contextvars. | ||
| fn create_deferred<'py, F, O>( | ||
| pub(crate) fn create_deferred<'py, F, O>( |
There was a problem hiding this comment.
This should be moved to its own general utility file instead of sharing it this way
| let txn_py = args.get_item(0)?; | ||
| let mut txn = txn_py.extract::<LoggingTransactionWrapper>()?; | ||
|
|
||
| let result = futures::executor::block_on(func(&mut txn)); |
There was a problem hiding this comment.
Using futures::executor::block_on(func(&mut txn)); looks fishy but I think it's fine.
The reason func is async is because txn.query(...).await is representing a round-trip between the database which tokio-postgres rightly represents as async and we want to be compatible with that.
Here on the Python-side, txn.query() is backed by blocking Python functions (and the whole transaction gets its own thread to work on). If there is an even simpler/easier way to drive the func here, please shout.
As @reivilibre pointed out in some off the cuff thoughts before any of this kicked off, we just need to document that "'transactions look like async functions but should only call .await on the database' which is a minor cosmetic flaw but seems fine, as you don't want to be doing any unnecessary waiting in your transaction anyway."
There was a problem hiding this comment.
As a thought, I don't think one should necessarily need an executor for this pattern in Synapse Python at all.
For this reason I am tempted to see what it feels like if we avoided futures::executor::block_on as a first pass and instead manually polled the Future a single time.
Because in Synapse Python we would have no genuine async in this path, it should immediately resolve to Poll::Ready.
Getting Poll::Pending would be considered a programming error.
block_on probably looks 'cleaner' in terms of syntax, but a single-shot poll is more enforcing of the concept we want to represent (and so gets my vote at a first pass).
| /// lowest common denominator that both the Python (`LoggingTransaction`) and | ||
| /// native `tokio-postgres` backends can produce. Callers are responsible for | ||
| /// parsing the strings into richer types as needed. | ||
| pub type Row = Vec<String>; |
There was a problem hiding this comment.
FWIW I think you can replace String with a custom type:
pub enum DbValue {
String(String), Bytes(Vec<u8>), Int(i64), Bool(bool), Null,
}which should be easily convertible on the Python boundary, and when talking to a Rust DB pool
| ) -> impl Future<Output = anyhow::Result<R>> + Send | ||
| where | ||
| R: Send + 'static, | ||
| F: for<'txn> Fn(&'txn mut dyn Transaction) -> BoxFuture<'txn, anyhow::Result<R>> |
There was a problem hiding this comment.
You could probably relax this to a FnMut?
`_trial_temp/test.log` ``` 2026-06-17 19:17:57-0500 [-] synapse.http.server - 147 - ERROR - GET-3 - Failed handle request via 'VersionsRestServlet': <SynapseRequest at 0x7ff02b1cf9d0 method='GET' uri='/_matrix/client/versions' clientproto='1.1' site='test'> Traceback (most recent call last): File "/home/eric/Documents/github/element/synapse/synapse/http/server.py", line 335, in _async_render_wrapper callback_return = await self._async_render(request) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/eric/Documents/github/element/synapse/synapse/http/server.py", line 576, in _async_render callback_return = await raw_callback_return ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/eric/Documents/github/element/synapse/synapse/rest/client/versions.py", line 81, in on_GET versions_response_body = await self.rust_handlers.versions.get_versions(user_id) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^ RuntimeError: Tokio runtime is not running ```
…work on our async Rust function
Previously, we were running into this problem:
```
SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.rest.client.test_versions.VersionsTestCase.test_authenticated
tests.rest.client.test_versions
VersionsTestCase
test_authenticated ... [ERROR]
===============================================================================
[ERROR]
Traceback (most recent call last):
File "/home/eric/Documents/github/element/synapse/tests/rest/client/test_versions.py", line 85, in test_authenticated
channel = self.make_request(
File "/home/eric/Documents/github/element/synapse/tests/unittest.py", line 619, in make_request
return make_request(
File "/home/eric/Documents/github/element/synapse/tests/server.py", line 486, in make_request
channel.await_result()
File "/home/eric/Documents/github/element/synapse/tests/server.py", line 310, in await_result
raise TimedOutException("Timed out waiting for request to finish.")
tests.server.TimedOutException: Timed out waiting for request to finish.
tests.rest.client.test_versions.VersionsTestCase.test_authenticated
-------------------------------------------------------------------------------
Ran 1 tests in 0.182s
FAILED (errors=1)
```
…pool to work on our async Rust function" This reverts commit 0b6a973.
Fix Tokio thread waiting a different way
| # XXX: We must create the Rust HTTP client before we call `reactor.run()` below. | ||
| # Twisted's `MemoryReactor` doesn't invoke `callWhenRunning` callbacks if it's | ||
| # already running and we rely on that to start the Tokio thread pool in Rust. In | ||
| # the future, this may not matter, see https://github.com/twisted/twisted/pull/12514 | ||
| self._http_client = hs.get_proxied_http_client() | ||
| _ = HttpClient( | ||
| reactor=hs.get_reactor(), | ||
| user_agent=self._http_client.user_agent.decode("utf8"), | ||
| ) |
There was a problem hiding this comment.
As an update, twisted/twisted#12514 is finally part of a Twisted release 26.4.0 (2026-05-11). If we updated our Twisted version, we could probably get rid of all of this ugliness.
But that may be a few years away given our deprecation policy considers a "no-brainer" upgrade once it's available in both the latest Debian Stable (currently Twisted 24.11.0) and Ubuntu LTS repositories (currently Twisted 25.5.0)
Note: This iteration isn't meant to be merged at all and is just meant to see if it's possible. Just splatting this here for the reference. It does seem to work (see testing strategy) although I haven't inspected the code. I just asked an LLM to take my previous non-working structure and make it work.
Rust database access via Python database connection pool.
This is a stepping stone before we can go full Rust everywhere. We're providing a generic interface as we want database access to work in Synapse and
synapse-rust-apps. Insynapse-rust-apps, we will use atokio-postgresbased database connection pool so it's full Rust.We want to avoid the situation where we have two database connection pools (one for Python, one for Rust) as we've run into connection exhaustion problems on Matrix.org before.
Why
runInteraction(...)?This is a variation of #19824 instead using the
runInteraction(...)pattern we already have in Synapse. This means everything that uses the transaction happens in the function callback which enables retries for serialization/deadlock errors when using repeatable-read isolation.Using the same
runInteractionpattern also means we can port over existing Synapse code/endpoints without much thought.This strategy is less of an impedance mismatch (aligns more closely) with Synapse so the glue code should also be simpler.
Testing strategy
poetry run synapse_homeserver --config-path homeserver.yamlGET http://localhost:8008/_matrix/client/versionsDev notes
LLM Prompt
The goal is to achieve Rust database access via the Python database connection pool.
We're providing a generic interface as we want database access to work in Synapse and a separate project
synapse-rust-apps. Insynapse-rust-apps, we will use atokio-postgresbased database connection pool so it's full Rust. This is what's being prototyped inrust/src/storage/db/rust_db_pool.rsvs the actual implementation we'll keep here inrust/src/storage/db/python_db_pool.rs.We want to stick with the
runInteraction(...)pattern from Synapse as it allows us to retry transactions in a world where we're using repeatable-read isolation level (serialization and deadlock errors).I'm in the middle of getting it working and most of it is just non-working structure. Please finish the rest.
As an example, we're doing some Rust database access in
rust/src/handlers/versions.rs->rust/src/storage/store.rsTransaction retries were originally introduced into Synapse for deadlocks which makes me question why they're even happening in the first place. How are other people dealing with this kind of problem? (poorly, not a problem, retries like us, etc)
@reivilibre points out that we're more likely to hit retries because of serialization errors which aligns with the Postgres docs (we use the repeatable read isolation level in Synapse):
Other PR where I dealt with deferred (
make_deferred_yieldable) and async in Rust: #18903Fix
MemoryReactor.callWhenRunningnot invoking callbacks if already started, twisted/twisted#12514 - this is relevant because we need to start the tokio thread pool for these tests.PyO3 Rust -> Python logging: https://pyo3.rs/main/ecosystem/logging
Todo
LoggingContext(log context) interactPull Request Checklist
EventStoretoEventWorkerStore.".code blocks.