Skip to content

Commit 12b52ec

Browse files
committed
feat(cache): add caching mechanism for chats and users
- Introduced a new `Cache` module to manage chat and user data. - Implemented methods to load, save, and retrieve cached data using `bincode` for serialization. - Integrated the cache into the `Client`, `Context`, and `Dispatcher` modules. - Added auto-save functionality for the cache at regular intervals and during application shutdown. - Updated `Cargo.toml` to include `bincode` as a dependency for encoding/decoding cache data. - Enhanced `Dispatcher` to save chats and users to the cache when processing updates.
1 parent ea53d9d commit 12b52ec

File tree

6 files changed

+252
-40
lines changed

6 files changed

+252
-40
lines changed

lib/ferogram/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,14 @@ ferogram-macros = { path = "../ferogram-macros", version = "0.1.0", optional = t
2929
grammers-client = { git = "https://github.com/Lonami/grammers.git", version = "0.7.0" }
3030
grammers-mtsender = { git = "https://github.com/Lonami/grammers.git", version = "0.7.0" }
3131

32-
log = "0.4.25"
32+
log = "^0.4"
3333
url = { version = "^2.5", optional = true }
3434
mlua = { version = "^0.10", features = ["async", "lua54", "module"], optional = true }
3535
pyo3 = { version = "^0.23", features = ["experimental-async", "macros"], optional = true }
36-
regex = "1.11.1"
36+
regex = "^1.11"
3737
tokio = { version = "^1.43", features = ["fs", "rt", "signal", "sync"] }
38-
rpassword = "7.3.1"
38+
bincode = { version = "^2.0" }
39+
rpassword = "^7.3"
3940
async-trait = "^0.1"
4041
futures-util = { version = "^0.3", default-features = false, features = ["alloc"] }
4142
async-recursion = "^1.1"

lib/ferogram/src/cache.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright 2024-2025 - Andriel Ferreira
2+
//
3+
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5+
// <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
6+
// option. This file may not be copied, modified, or distributed
7+
// except according to those terms.
8+
9+
//! Cache module.
10+
11+
use std::{collections::HashMap, path::Path, sync::Arc};
12+
13+
use bincode::{
14+
Decode, Encode, config,
15+
de::Decoder,
16+
enc::Encoder,
17+
error::{DecodeError, EncodeError},
18+
};
19+
use grammers_client::types::PackedChat;
20+
use tokio::sync::RwLock;
21+
22+
/// The cache.
23+
#[derive(Clone, Debug, Default)]
24+
pub struct Cache {
25+
/// The inner cache.
26+
inner: Arc<RwLock<InnerCache>>,
27+
}
28+
29+
impl Cache {
30+
/// Load a previous cache instance from a file or create one if it doesn’t exist.
31+
pub fn load_file_or_create<P: AsRef<Path>>(path: P) -> crate::Result<Self> {
32+
// try to open the cache file.
33+
if let Ok(mut file) = std::fs::File::open(&path) {
34+
// get the standard config.
35+
let config = config::standard();
36+
37+
// construct the inner cache.
38+
let inner: InnerCache = bincode::decode_from_std_read(&mut file, config)?;
39+
40+
log::debug!("loaded {} chats from cache", inner.chats.len());
41+
42+
Ok(Self {
43+
inner: Arc::new(RwLock::new(inner)),
44+
})
45+
} else {
46+
log::debug!("no cache was found, generating a new one");
47+
48+
Ok(Self {
49+
inner: Arc::new(RwLock::new(InnerCache::default())),
50+
})
51+
}
52+
}
53+
54+
/// Try to save the cache to a file.
55+
pub async fn save_to_file<P: AsRef<Path>>(&self, path: P) -> crate::Result<()> {
56+
// delete the cache file if it exists.
57+
if std::fs::exists(&path)? {
58+
std::fs::remove_file(&path)?;
59+
}
60+
61+
// create the cache file.
62+
let mut file = std::fs::File::create(path)?;
63+
64+
// get the standard config.
65+
let config = config::standard();
66+
67+
// clone the inner.
68+
let inner = self.inner.write().await.clone();
69+
70+
// write to the file.
71+
bincode::encode_into_std_write(inner, &mut file, config)?;
72+
73+
Ok(())
74+
}
75+
76+
/// Gets a saved chat by its ID.
77+
pub fn get_chat(&self, chat_id: i64) -> Option<PackedChat> {
78+
let inner = self.inner.try_read().expect("failed to get saved chats");
79+
80+
inner.chats.get(&chat_id).cloned()
81+
}
82+
83+
/// Saves a chat in the cache.
84+
pub(crate) async fn save_chat(&self, chat: PackedChat) -> crate::Result<()> {
85+
let mut inner = self.inner.write().await;
86+
87+
if !inner.chat_exists(chat.id) {
88+
log::trace!("saved a new chat: {:?}", chat);
89+
90+
inner.push_chat(chat);
91+
}
92+
93+
Ok(())
94+
}
95+
}
96+
97+
/// The inner cache.
98+
#[derive(Clone, Debug, Default)]
99+
struct InnerCache {
100+
/// The packed chat map.
101+
chats: HashMap<i64, PackedChat>,
102+
}
103+
104+
impl InnerCache {
105+
/// Pushes a chat.
106+
pub fn push_chat(&mut self, chat: PackedChat) {
107+
self.chats.entry(chat.id).or_insert(chat);
108+
}
109+
110+
/// Checks if a chat exists.
111+
pub fn chat_exists(&self, chat_id: i64) -> bool {
112+
self.chats.contains_key(&chat_id)
113+
}
114+
}
115+
116+
impl Encode for InnerCache {
117+
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
118+
// convert chats to bytes.
119+
let chats = self
120+
.chats
121+
.clone()
122+
.into_iter()
123+
.map(|(id, chat)| (id, chat.to_bytes()))
124+
.collect::<HashMap<_, _>>();
125+
126+
Encode::encode(&chats, encoder)?;
127+
128+
Ok(())
129+
}
130+
}
131+
132+
impl<Context> Decode<Context> for InnerCache {
133+
fn decode<D: Decoder<Context = Context>>(decoder: &mut D) -> Result<Self, DecodeError> {
134+
// convert bytes to chats.
135+
let encoded_chats: HashMap<i64, [u8; 17]> = Decode::decode(decoder)?;
136+
let chats = encoded_chats
137+
.into_iter()
138+
.map(|(id, bytes)| {
139+
(
140+
id,
141+
PackedChat::from_bytes(&bytes).expect("failed to decode chat bytes"),
142+
)
143+
})
144+
.collect::<HashMap<_, _>>();
145+
146+
Ok(Self { chats })
147+
}
148+
}

lib/ferogram/src/client.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88

99
//! Client module.
1010
11-
use std::path::Path;
11+
use std::{path::Path, time::Duration};
1212

1313
use grammers_client::{
1414
Config, InitParams, ReconnectionPolicy, SignInError, grammers_tl_types as tl, session::Session,
1515
};
1616
use grammers_mtsender::ServerAddr;
1717

18-
use crate::{Context, Dispatcher, ErrorHandler, Result, di, utils::prompt};
18+
use crate::{Cache, Context, Dispatcher, ErrorHandler, Result, di, utils::prompt};
1919

2020
/// Wrapper about grammers' `Client` instance.
2121
pub struct Client {
@@ -26,14 +26,16 @@ pub struct Client {
2626
/// The inner grammers' `Client` instance.
2727
inner_client: grammers_client::Client,
2828

29+
/// The session cache.
30+
cache: Cache,
2931
/// The session file path.
3032
session_file: Option<String>,
3133

3234
/// Whether the client is connected.
3335
is_connected: bool,
3436
/// Whether is to update Telegram's bot commands.
3537
set_bot_commands: bool,
36-
/// Wheter is to wait for a `Ctrl + C` signal to close the connection and exit the app.
38+
/// Whether is to wait for a `Ctrl + C` signal to close the connection and exit the app.
3739
wait_for_ctrl_c: bool,
3840

3941
/// The global error handler.
@@ -226,7 +228,7 @@ impl Client {
226228
pub fn new_ctx(&self) -> Context {
227229
let upd_receiver = self.dispatcher.upd_sender.subscribe();
228230

229-
Context::new(&self.inner_client, upd_receiver)
231+
Context::new(&self.cache, &self.inner_client, upd_receiver)
230232
}
231233

232234
/// Listen to Telegram's updates and send them to the dispatcher's routers.
@@ -239,6 +241,7 @@ impl Client {
239241
/// # }
240242
/// ```
241243
pub async fn run(self) -> Result<()> {
244+
let cache = self.cache.clone();
242245
let handle = self.inner_client;
243246
let dispatcher = self.dispatcher;
244247
let err_handler = self.err_handler;
@@ -286,12 +289,13 @@ impl Client {
286289
loop {
287290
match handle.next_update().await {
288291
Ok(update) => {
292+
let cache = cache.clone();
289293
let client = handle.clone();
290294
let mut dp = dispatcher.clone();
291295
let err_handler = err_handler.clone();
292296

293297
tokio::task::spawn(async move {
294-
if let Err(e) = dp.handle_update(&client, &update).await {
298+
if let Err(e) = dp.handle_update(&cache, &client, &update).await {
295299
if let Some(err_handler) = err_handler.as_ref() {
296300
err_handler.run(client, update, e).await;
297301
} else {
@@ -308,6 +312,21 @@ impl Client {
308312
});
309313

310314
if self.wait_for_ctrl_c {
315+
let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
316+
let cache_file = format!("{}.cc", session_file);
317+
318+
let cache = self.cache.clone();
319+
let path = cache_file.clone();
320+
tokio::task::spawn(async move {
321+
loop {
322+
tokio::time::sleep(Duration::from_secs(60)).await;
323+
324+
if let Err(e) = cache.save_to_file(&path).await {
325+
log::error!("failed to auto-save cache: {}", e);
326+
}
327+
}
328+
});
329+
311330
tokio::signal::ctrl_c().await?;
312331

313332
if let Some(mut handler) = self.exit_handler {
@@ -317,8 +336,8 @@ impl Client {
317336
handler.handle(&mut injector).await.unwrap();
318337
}
319338

320-
let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
321339
client.session().save_to_file(session_file)?;
340+
self.cache.save_to_file(cache_file).await?;
322341
}
323342

324343
Ok(())
@@ -335,6 +354,7 @@ impl Client {
335354
/// ```
336355
pub async fn keep_alive(self) -> Result<()> {
337356
let handle = self.inner_client;
357+
let client = handle.clone();
338358

339359
tokio::task::spawn(async move {
340360
loop {
@@ -344,6 +364,12 @@ impl Client {
344364

345365
if self.wait_for_ctrl_c {
346366
tokio::signal::ctrl_c().await?;
367+
368+
let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
369+
let cache_file = &format!("{}.cc", session_file);
370+
371+
client.session().save_to_file(session_file)?;
372+
self.cache.save_to_file(cache_file).await?;
347373
}
348374

349375
Ok(())
@@ -427,6 +453,15 @@ impl ClientBuilder {
427453
/// ```
428454
pub async fn build(self) -> Result<Client> {
429455
let session_file = self.session_file.as_deref().unwrap_or("./ferogram.session");
456+
let cache_file = &format!("{}.cc", session_file);
457+
458+
// check if the session file don't exists and the cache file exists.
459+
if !std::fs::exists(session_file)? && std::fs::exists(cache_file)? {
460+
// delete the cache file if the session file changes.
461+
std::fs::remove_file(cache_file)?;
462+
463+
log::debug!("session file changed, deleting the cache file");
464+
}
430465

431466
let inner_client = grammers_client::Client::connect(Config {
432467
session: Session::load_file_or_create(session_file)?,
@@ -441,6 +476,7 @@ impl ClientBuilder {
441476
client_type: self.client_type,
442477
inner_client,
443478

479+
cache: Cache::load_file_or_create(cache_file)?,
444480
session_file: Some(session_file.to_string()),
445481

446482
is_connected: false,

lib/ferogram/src/context.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ use tokio::{
2323
sync::{Mutex, broadcast::Receiver},
2424
};
2525

26-
use crate::{Filter, utils::bytes_to_string};
26+
use crate::{Cache, Filter, utils::bytes_to_string};
2727

2828
/// The context of an update.
2929
#[derive(Debug)]
3030
pub struct Context {
31-
// The client that received the update.
31+
/// The session cache.
32+
pub cache: Cache,
33+
/// The client that received the update.
3234
client: grammers_client::Client,
3335
/// The update itself.
3436
update: Option<Update>,
@@ -38,8 +40,13 @@ pub struct Context {
3840

3941
impl Context {
4042
/// Creates a new context.
41-
pub fn new(client: &grammers_client::Client, upd_receiver: Receiver<Update>) -> Self {
43+
pub fn new(
44+
cache: &Cache,
45+
client: &grammers_client::Client,
46+
upd_receiver: Receiver<Update>,
47+
) -> Self {
4248
Self {
49+
cache: cache.clone(),
4350
client: client.clone(),
4451
update: None,
4552
upd_receiver: Arc::new(Mutex::new(upd_receiver)),
@@ -48,11 +55,13 @@ impl Context {
4855

4956
/// Creates a new context with an update.
5057
pub fn with(
58+
cache: &Cache,
5159
client: &grammers_client::Client,
5260
update: &Update,
5361
upd_receiver: Receiver<Update>,
5462
) -> Self {
5563
Self {
64+
cache: cache.clone(),
5665
client: client.clone(),
5766
update: Some(update.clone()),
5867
upd_receiver: Arc::new(Mutex::new(upd_receiver)),
@@ -77,6 +86,7 @@ impl Context {
7786
.expect("Failed to lock receiver");
7887

7988
Self {
89+
cache: self.cache.clone(),
8090
client: self.client.clone(),
8191
update: Some(update.clone()),
8292
upd_receiver: Arc::new(Mutex::new(upd_receiver.resubscribe())),
@@ -1259,6 +1269,7 @@ impl Clone for Context {
12591269
.expect("Failed to lock receiver");
12601270

12611271
Self {
1272+
cache: self.cache.clone(),
12621273
client: self.client.clone(),
12631274
update: self.update.clone(),
12641275
upd_receiver: Arc::new(Mutex::new(upd_receiver.resubscribe())),

0 commit comments

Comments
 (0)