Skip to content

Commit 9c358ae

Browse files
authored
Merge pull request #191 from Lex-Studios/feature/issue-174-distributed-locks
Add distributed lock manager for multi-instance deployments
2 parents 02e36c6 + f591b6e commit 9c358ae

11 files changed

Lines changed: 359 additions & 9 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ governor = "0.6"
5151
redis = { version = "0.24", features = ["tokio-comp"] }
5252
jsonschema = "0.17"
5353
once_cell = "1.19"
54-
hyper = "0.14"
5554
ipnet = "2.9"
5655
utoipa = { version = "4", features = ["axum_extras"] }
5756
utoipa-swagger-ui = { version = "6", features = ["axum"] }

src/handlers/admin.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ use crate::middleware::quota::{Quota, QuotaManager, QuotaStatus, ResetSchedule,
22
use axum::{
33
extract::{Path, State},
44
http::StatusCode,
5-
response::IntoResponse,
65
Json,
76
};
87
use serde::{Deserialize, Serialize};
98

9+
#[derive(Clone)]
1010
pub struct AdminState {
1111
pub quota_manager: QuotaManager,
1212
}
@@ -92,3 +92,11 @@ pub async fn reset_quota(
9292
.map(|_| StatusCode::OK)
9393
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
9494
}
95+
96+
pub fn admin_routes() -> axum::Router<AdminState> {
97+
use axum::routing::{get, post};
98+
axum::Router::new()
99+
.route("/quota/:key", get(get_quota_status))
100+
.route("/quota/:key", post(set_quota))
101+
.route("/quota/:key/reset", post(reset_quota))
102+
}

src/main.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use synapse_core::{
1212
graphql::schema::build_schema,
1313
handlers,
1414
handlers::ws::TransactionStatusUpdate,
15-
metrics, middleware,
15+
metrics,
1616
middleware::idempotency::IdempotencyService,
1717
schemas,
1818
services::{FeatureFlagService, SettlementService},
@@ -258,10 +258,11 @@ async fn serve(config: config::Config) -> anyhow::Result<()> {
258258
let _dlq_routes: Router =
259259
handlers::dlq::dlq_routes().with_state(api_state.app_state.db.clone());
260260

261-
let _admin_routes: Router = Router::new()
262-
.nest("/admin/queue", handlers::admin::admin_routes())
263-
.layer(axum_middleware::from_fn(middleware::auth::admin_auth))
264-
.with_state(api_state.app_state.db.clone());
261+
// Admin routes disabled - requires AdminState setup
262+
// let _admin_routes: Router = Router::new()
263+
// .nest("/admin/queue", handlers::admin::admin_routes())
264+
// .layer(axum_middleware::from_fn(middleware::auth::admin_auth))
265+
// .with_state(api_state.app_state.db.clone());
265266

266267
let _search_routes: Router = Router::new()
267268
.route(

src/middleware/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
pub mod auth;
22
pub mod idempotency;
33
pub mod ip_filter;
4+
pub mod quota;
45
pub mod request_logger;
6+
pub mod validate;
57
pub mod versioning;

src/middleware/quota.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use redis::{aio::MultiplexedConnection, AsyncCommands, Client};
22
use serde::{Deserialize, Serialize};
3-
use std::time::Duration;
43

54
#[derive(Debug, Clone, Serialize, Deserialize)]
65
pub enum Tier {
@@ -43,6 +42,7 @@ impl ResetSchedule {
4342
}
4443
}
4544

45+
#[derive(Clone)]
4646
pub struct QuotaManager {
4747
redis_client: Client,
4848
}

src/services/lock_examples.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use crate::services::{LockManager, TransactionProcessor};
2+
use std::time::Duration;
3+
use tracing::{info, warn};
4+
use uuid::Uuid;
5+
6+
/// Example: Process transaction with distributed lock
7+
pub async fn process_transaction_with_lock(
8+
lock_manager: &LockManager,
9+
processor: &TransactionProcessor,
10+
tx_id: Uuid,
11+
) -> anyhow::Result<bool> {
12+
let resource = format!("transaction:{}", tx_id);
13+
let timeout = Duration::from_secs(5);
14+
15+
// Try to acquire lock
16+
let lock = match lock_manager.acquire(&resource, timeout).await? {
17+
Some(lock) => lock,
18+
None => {
19+
warn!("Could not acquire lock for transaction {}", tx_id);
20+
return Ok(false);
21+
}
22+
};
23+
24+
info!("Processing transaction {} with lock", tx_id);
25+
26+
// Process transaction
27+
let result = processor.process_transaction(tx_id).await;
28+
29+
// Release lock
30+
lock.release().await?;
31+
32+
result.map(|_| true)
33+
}
34+
35+
/// Example: Long-running operation with auto-renewal
36+
pub async fn long_running_with_lock(
37+
lock_manager: &LockManager,
38+
resource: &str,
39+
) -> anyhow::Result<()> {
40+
let timeout = Duration::from_secs(5);
41+
42+
let lock = match lock_manager.acquire(resource, timeout).await? {
43+
Some(lock) => lock,
44+
None => {
45+
return Err(anyhow::anyhow!("Could not acquire lock"));
46+
}
47+
};
48+
49+
// Spawn auto-renewal task
50+
let renewal_lock = lock.clone();
51+
tokio::spawn(async move {
52+
renewal_lock.auto_renew_task().await;
53+
});
54+
55+
// Do long-running work
56+
tokio::time::sleep(Duration::from_secs(60)).await;
57+
58+
// Lock will be released on drop
59+
Ok(())
60+
}
61+
62+
/// Example: Using with_lock helper
63+
pub async fn process_with_helper(
64+
lock_manager: &LockManager,
65+
processor: &TransactionProcessor,
66+
tx_id: Uuid,
67+
) -> anyhow::Result<Option<()>> {
68+
let resource = format!("transaction:{}", tx_id);
69+
let timeout = Duration::from_secs(5);
70+
71+
lock_manager
72+
.with_lock(&resource, timeout, || {
73+
Box::pin(async move {
74+
processor
75+
.process_transaction(tx_id)
76+
.await
77+
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
78+
})
79+
})
80+
.await
81+
.map_err(|e| anyhow::anyhow!("Lock error: {}", e))
82+
}

0 commit comments

Comments
 (0)