diff --git a/package.json b/package.json index fcadb1c..465617e 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "wealthfolio-app", "private": true, - "version": "1.0.14", + "version": "1.0.15", "type": "module", "scripts": { "dev": "vite", diff --git a/src-core/src/account/account_service.rs b/src-core/src/account/account_service.rs index 9165c91..93d3fe4 100644 --- a/src-core/src/account/account_service.rs +++ b/src-core/src/account/account_service.rs @@ -1,40 +1,42 @@ use crate::account::AccountRepository; use crate::fx::fx_service::CurrencyExchangeService; use crate::models::{Account, AccountUpdate, NewAccount}; -use diesel::r2d2::{ConnectionManager, Pool}; use diesel::Connection; use diesel::SqliteConnection; pub struct AccountService { account_repo: AccountRepository, - pool: Pool>, base_currency: String, } impl AccountService { - pub fn new(pool: Pool>, base_currency: String) -> Self { + pub fn new(base_currency: String) -> Self { AccountService { account_repo: AccountRepository::new(), - pool, base_currency, } } - pub fn get_accounts(&self) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.account_repo.load_accounts(&mut conn) + pub fn get_accounts( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + self.account_repo.load_accounts(conn) } - pub fn get_account_by_id(&self, account_id: &str) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.account_repo.load_account_by_id(&mut conn, account_id) + pub fn get_account_by_id( + &self, + conn: &mut SqliteConnection, + account_id: &str, + ) -> Result { + self.account_repo.load_account_by_id(conn, account_id) } pub async fn create_account( &self, + conn: &mut SqliteConnection, new_account: NewAccount, ) -> Result> { - let mut conn = self.pool.get()?; let base_currency = self.base_currency.clone(); println!( @@ -43,9 +45,12 @@ impl AccountService { ); conn.transaction(|conn| { if new_account.currency != base_currency { - let fx_service = CurrencyExchangeService::new(self.pool.clone()); - fx_service - .add_exchange_rate(base_currency.clone(), new_account.currency.clone())?; + let fx_service = CurrencyExchangeService::new(); + fx_service.add_exchange_rate( + conn, + base_currency.clone(), + new_account.currency.clone(), + )?; } // Insert new account @@ -57,28 +62,25 @@ impl AccountService { pub fn update_account( &self, + conn: &mut SqliteConnection, updated_account_data: AccountUpdate, ) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.account_repo - .update_account(&mut conn, updated_account_data) + self.account_repo.update_account(conn, updated_account_data) } pub fn delete_account( &self, + conn: &mut SqliteConnection, account_id_to_delete: String, ) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.account_repo - .delete_account(&mut conn, account_id_to_delete) + self.account_repo.delete_account(conn, account_id_to_delete) } pub fn get_accounts_by_ids( &self, + conn: &mut SqliteConnection, account_ids: &[String], ) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.account_repo - .load_accounts_by_ids(&mut conn, account_ids) + self.account_repo.load_accounts_by_ids(conn, account_ids) } } diff --git a/src-core/src/activity/activity_service.rs b/src-core/src/activity/activity_service.rs index af3c18f..0eae1c2 100644 --- a/src-core/src/activity/activity_service.rs +++ b/src-core/src/activity/activity_service.rs @@ -11,39 +11,42 @@ use crate::schema::activities; use csv::ReaderBuilder; use diesel::prelude::*; -use diesel::r2d2::{ConnectionManager, Pool}; use uuid::Uuid; pub struct ActivityService { repo: ActivityRepository, - pool: Pool>, base_currency: String, } impl ActivityService { - pub fn new(pool: Pool>, base_currency: String) -> Self { + pub fn new(base_currency: String) -> Self { ActivityService { repo: ActivityRepository::new(), - pool, base_currency, } } //load all activities - pub fn get_activities(&self) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.repo.get_activities(&mut conn) + pub fn get_activities( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + self.repo.get_activities(conn) } - pub fn get_trading_activities(&self) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.repo.get_trading_activities(&mut conn) + pub fn get_trading_activities( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + self.repo.get_trading_activities(conn) } - pub fn get_income_data(&self) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.repo.get_income_activities(&mut conn).map(|results| { + pub fn get_income_data( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + self.repo.get_income_activities(conn).map(|results| { results .into_iter() .map(|activity| IncomeData { @@ -59,6 +62,7 @@ impl ActivityService { pub fn search_activities( &self, + conn: &mut SqliteConnection, page: i64, // Page number, 1-based page_size: i64, // Number of items per page account_id_filter: Option>, // Optional account_id filter @@ -66,9 +70,8 @@ impl ActivityService { asset_id_keyword: Option, // Optional asset_id keyword for search sort: Option, // Optional sort ) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); self.repo.search_activities( - &mut conn, + conn, page, page_size, account_id_filter, @@ -81,16 +84,16 @@ impl ActivityService { //create a new activity and fetch related the asset profile pub async fn create_activity( &self, + conn: &mut SqliteConnection, mut activity: NewActivity, ) -> Result> { - let mut conn = self.pool.get()?; let asset_id = activity.asset_id.clone(); - let asset_service = AssetService::new(self.pool.clone()).await; - let account_service = AccountService::new(self.pool.clone(), self.base_currency.clone()); + let asset_service = AssetService::new().await; + let account_service = AccountService::new(self.base_currency.clone()); let asset_profile = asset_service - .get_asset_profile(&asset_id, Some(true)) + .get_asset_profile(conn, &asset_id, Some(true)) .await?; - let account = account_service.get_account_by_id(&activity.account_id)?; + let account = account_service.get_account_by_id(conn, &activity.account_id)?; conn.transaction(|conn| { // Update activity currency if asset_profile.currency is available @@ -106,9 +109,12 @@ impl ActivityService { // Create exchange rate if asset currency is different from account currency if activity.currency != account.currency { - let fx_service = CurrencyExchangeService::new(self.pool.clone()); - fx_service - .add_exchange_rate(account.currency.clone(), activity.currency.clone())?; + let fx_service = CurrencyExchangeService::new(); + fx_service.add_exchange_rate( + conn, + account.currency.clone(), + activity.currency.clone(), + )?; } // Insert the new activity into the database @@ -121,15 +127,15 @@ impl ActivityService { // update an activity pub async fn update_activity( &self, + conn: &mut SqliteConnection, mut activity: ActivityUpdate, ) -> Result> { - let mut conn = self.pool.get()?; - let asset_service = AssetService::new(self.pool.clone()).await; - let account_service = AccountService::new(self.pool.clone(), self.base_currency.clone()); + let asset_service = AssetService::new().await; + let account_service = AccountService::new(self.base_currency.clone()); let asset_profile = asset_service - .get_asset_profile(&activity.asset_id, Some(true)) + .get_asset_profile(conn, &activity.asset_id, Some(true)) .await?; - let account = account_service.get_account_by_id(&activity.account_id)?; + let account = account_service.get_account_by_id(conn, &activity.account_id)?; conn.transaction(|conn| { // Update activity currency if asset_profile.currency is available @@ -145,9 +151,12 @@ impl ActivityService { // Create exchange rate if asset currency is different from account currency if activity.currency != account.currency { - let fx_service = CurrencyExchangeService::new(self.pool.clone()); - fx_service - .add_exchange_rate(account.currency.clone(), activity.currency.clone())?; + let fx_service = CurrencyExchangeService::new(); + fx_service.add_exchange_rate( + conn, + account.currency.clone(), + activity.currency.clone(), + )?; } // Update the activity in the database @@ -160,14 +169,15 @@ impl ActivityService { // verify the activities import from csv file pub async fn check_activities_import( &self, + conn: &mut SqliteConnection, _account_id: String, file_path: String, ) -> Result, String> { - let asset_service = AssetService::new(self.pool.clone()).await; - let account_service = AccountService::new(self.pool.clone(), self.base_currency.clone()); - let fx_service = CurrencyExchangeService::new(self.pool.clone()); + let asset_service = AssetService::new().await; + let account_service = AccountService::new(self.base_currency.clone()); + let fx_service = CurrencyExchangeService::new(); let account = account_service - .get_account_by_id(&_account_id) + .get_account_by_id(conn, &_account_id) .map_err(|e| e.to_string())?; let file = File::open(&file_path).map_err(|e| e.to_string())?; @@ -184,7 +194,7 @@ impl ActivityService { // Load the symbol profile here, now awaiting the async call let symbol_profile_result = asset_service - .get_asset_profile(&activity_import.symbol, Some(false)) + .get_asset_profile(conn, &activity_import.symbol, Some(false)) .await; // Check if symbol profile is valid @@ -196,9 +206,11 @@ impl ActivityService { // Add exchange rate if the activity currency is different from the account currency let currency = &activity_import.currency; if currency != &account.currency { - match fx_service - .add_exchange_rate(account.currency.clone(), currency.clone()) - { + match fx_service.add_exchange_rate( + conn, + account.currency.clone(), + currency.clone(), + ) { Ok(_) => (), Err(e) => { let error_msg = format!( @@ -234,7 +246,9 @@ impl ActivityService { // Sync quotes for all valid symbols if !symbols_to_sync.is_empty() { - asset_service.sync_symbol_quotes(&symbols_to_sync).await?; + asset_service + .sync_symbol_quotes(conn, &symbols_to_sync) + .await?; } Ok(activities_with_status) @@ -243,9 +257,9 @@ impl ActivityService { // create activities used after the import is verified pub fn create_activities( &self, + conn: &mut SqliteConnection, activities: Vec, ) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); conn.transaction(|conn| { let mut insert_count = 0; for new_activity in activities { @@ -260,17 +274,19 @@ impl ActivityService { } // delete an activity - pub fn delete_activity(&self, activity_id: String) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.repo.delete_activity(&mut conn, activity_id) + pub fn delete_activity( + &self, + conn: &mut SqliteConnection, + activity_id: String, + ) -> Result { + self.repo.delete_activity(conn, activity_id) } pub fn get_activities_by_account_ids( &self, + conn: &mut SqliteConnection, account_ids: &[String], ) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - self.repo - .get_activities_by_account_ids(&mut conn, account_ids) + self.repo.get_activities_by_account_ids(conn, account_ids) } } diff --git a/src-core/src/asset/asset_service.rs b/src-core/src/asset/asset_service.rs index f4236c0..b1d5372 100644 --- a/src-core/src/asset/asset_service.rs +++ b/src-core/src/asset/asset_service.rs @@ -2,12 +2,10 @@ use crate::market_data::market_data_service::MarketDataService; use crate::models::{Asset, AssetProfile, NewAsset, Quote, QuoteSummary}; use crate::schema::{assets, quotes}; use diesel::prelude::*; -use diesel::r2d2::{ConnectionManager, Pool}; use diesel::SqliteConnection; use std::sync::Arc; pub struct AssetService { market_data_service: Arc, - pool: Pool>, } impl From for Quote { @@ -29,35 +27,41 @@ impl From for Quote { } impl AssetService { - pub async fn new(pool: Pool>) -> Self { - let market_data_service = Arc::new(MarketDataService::new(pool.clone()).await); + pub async fn new() -> Self { + let market_data_service = Arc::new(MarketDataService::new().await); Self { - pool, market_data_service, } } - pub fn get_assets(&self) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - assets::table.load::(&mut conn) + pub fn get_assets( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + assets::table.load::(conn) } - pub fn get_asset_by_id(&self, asset_id: &str) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - assets::table.find(asset_id).first::(&mut conn) + pub fn get_asset_by_id( + &self, + conn: &mut SqliteConnection, + asset_id: &str, + ) -> Result { + assets::table.find(asset_id).first::(conn) } - pub fn get_asset_data(&self, asset_id: &str) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - + pub fn get_asset_data( + &self, + conn: &mut SqliteConnection, + asset_id: &str, + ) -> Result { let asset = assets::table .filter(assets::id.eq(asset_id)) - .first::(&mut conn)?; + .first::(conn)?; let quote_history = quotes::table .filter(quotes::symbol.eq(&asset.symbol)) .order(quotes::date.desc()) - .load::(&mut conn)?; + .load::(conn)?; Ok(AssetProfile { asset, @@ -67,15 +71,15 @@ impl AssetService { pub fn load_currency_assets( &self, + conn: &mut SqliteConnection, base_currency: &str, ) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); use crate::schema::assets::dsl::*; assets .filter(asset_type.eq("Currency")) .filter(symbol.like(format!("{}%", base_currency))) - .load::(&mut conn) + .load::(conn) } pub fn create_exchange_rate_symbols( @@ -95,7 +99,7 @@ impl AssetService { let new_assets: Vec = symbols .iter() - .filter(|symbol| self.get_asset_by_id(symbol).is_err()) + .filter(|symbol| self.get_asset_by_id(conn, symbol).is_err()) .map(|symbol| NewAsset { id: symbol.to_string(), isin: None, @@ -189,12 +193,20 @@ impl AssetService { .get_result::(conn) } - pub fn get_latest_quote(&self, symbol_query: &str) -> QueryResult { - self.market_data_service.get_latest_quote(symbol_query) + pub fn get_latest_quote( + &self, + conn: &mut SqliteConnection, + symbol_query: &str, + ) -> QueryResult { + self.market_data_service + .get_latest_quote(conn, symbol_query) } - pub fn get_history_quotes(&self) -> Result, diesel::result::Error> { - self.market_data_service.get_history_quotes() + pub fn get_history_quotes( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + self.market_data_service.get_history_quotes(conn) } pub async fn search_ticker(&self, query: &str) -> Result, String> { @@ -203,15 +215,15 @@ impl AssetService { pub async fn get_asset_profile( &self, + conn: &mut SqliteConnection, asset_id: &str, sync: Option, ) -> Result { - let mut conn = self.pool.get().expect("Couldn't get db connection"); use crate::schema::assets::dsl::*; let should_sync = sync.unwrap_or(true); - match assets.find(asset_id).first::(&mut conn) { + match assets.find(asset_id).first::(conn) { Ok(existing_profile) => Ok(existing_profile), Err(diesel::NotFound) => { // symbol not found in database. Fetching from market data service. @@ -230,10 +242,10 @@ impl AssetService { let inserted_asset = diesel::insert_into(assets) .values(&fetched_profile) .returning(Asset::as_returning()) - .get_result(&mut conn)?; + .get_result(conn)?; if should_sync { - self.sync_symbol_quotes(&[inserted_asset.symbol.clone()]) + self.sync_symbol_quotes(conn, &[inserted_asset.symbol.clone()]) .await .map_err(|e| { println!( @@ -256,7 +268,11 @@ impl AssetService { } } - pub async fn sync_symbol_quotes(&self, symbols: &[String]) -> Result<(), String> { - self.market_data_service.sync_quotes(symbols).await + pub async fn sync_symbol_quotes( + &self, + conn: &mut SqliteConnection, + symbols: &[String], + ) -> Result<(), String> { + self.market_data_service.sync_quotes(conn, symbols).await } } diff --git a/src-core/src/fx/fx_service.rs b/src-core/src/fx/fx_service.rs index 08c9b28..25af723 100644 --- a/src-core/src/fx/fx_service.rs +++ b/src-core/src/fx/fx_service.rs @@ -1,23 +1,39 @@ use crate::fx::fx_repository::FxRepository; use crate::models::ExchangeRate; -use diesel::r2d2::{ConnectionManager, Pool}; use diesel::SqliteConnection; use std::collections::HashMap; use std::sync::{Arc, RwLock}; pub struct CurrencyExchangeService { - pool: Pool>, exchange_rates: Arc>>, } impl CurrencyExchangeService { - pub fn new(pool: Pool>) -> Self { + pub fn new() -> Self { Self { - pool, exchange_rates: Arc::new(RwLock::new(HashMap::new())), } } + pub fn initialize( + &self, + conn: &mut SqliteConnection, + ) -> Result<(), Box> { + let exchange_rates = FxRepository::get_exchange_rates(conn)? + .into_iter() + .map(|rate| { + ( + format!("{}{}", rate.from_currency, rate.to_currency), + rate.rate, + ) + }) + .collect::>(); + + let mut cache = self.exchange_rates.write().map_err(|_| "RwLock poisoned")?; + *cache = exchange_rates; + Ok(()) + } + pub fn get_latest_exchange_rate( &self, from_currency: &str, @@ -29,54 +45,30 @@ impl CurrencyExchangeService { let key = format!("{}{}", from_currency, to_currency); - // Check cache first - { - let cache = self.exchange_rates.read().map_err(|_| "RwLock poisoned")?; - if let Some(&rate) = cache.get(&key) { - return Ok(rate); - } - } - - let mut conn = self.pool.get()?; - - // Try to get the direct rate - if let Some(rate) = FxRepository::get_exchange_rate(&mut conn, from_currency, to_currency)? - { - self.cache_rate(&key, rate.rate)?; - return Ok(rate.rate); + // Check cache + let cache = self.exchange_rates.read().map_err(|_| "RwLock poisoned")?; + if let Some(&rate) = cache.get(&key) { + return Ok(rate); } // If not found, try the inverse rate - if let Some(rate) = FxRepository::get_exchange_rate(&mut conn, to_currency, from_currency)? - { - let inverse_rate = 1.0 / rate.rate; - self.cache_rate(&key, inverse_rate)?; - return Ok(inverse_rate); + let inverse_key = format!("{}{}", to_currency, from_currency); + if let Some(&rate) = cache.get(&inverse_key) { + return Ok(1.0 / rate); } // If still not found, try USD conversion - let from_usd = self.get_usd_rate(&mut conn, from_currency)?; - let to_usd = self.get_usd_rate(&mut conn, to_currency)?; + let from_usd = cache + .get(&format!("{}USD", from_currency)) + .ok_or_else(|| format!("No USD rate found for {}", from_currency))?; + let to_usd = cache + .get(&format!("{}USD", to_currency)) + .ok_or_else(|| format!("No USD rate found for {}", to_currency))?; let rate = from_usd / to_usd; - self.cache_rate(&key, rate)?; Ok(rate) } - fn get_usd_rate( - &self, - conn: &mut SqliteConnection, - currency: &str, - ) -> Result> { - if currency == "USD" { - return Ok(1.0); - } - - FxRepository::get_exchange_rate(conn, currency, "USD")? - .map(|rate| rate.rate) - .ok_or_else(|| format!("No USD rate found for {}", currency).into()) - } - fn cache_rate(&self, key: &str, rate: f64) -> Result<(), Box> { let mut cache = self.exchange_rates.write().map_err(|_| "RwLock poisoned")?; cache.insert(key.to_string(), rate); @@ -99,17 +91,19 @@ impl CurrencyExchangeService { pub fn update_exchange_rate( &self, + conn: &mut SqliteConnection, rate: &ExchangeRate, ) -> Result> { - let mut conn = self.pool.get()?; - let updated_rate = FxRepository::update_exchange_rate(&mut conn, rate)?; + let updated_rate = FxRepository::update_exchange_rate(conn, rate)?; self.cache_rate(&updated_rate.id, updated_rate.rate)?; Ok(updated_rate) } - pub fn get_exchange_rates(&self) -> Result, Box> { - let mut conn = self.pool.get()?; - let rates = FxRepository::get_exchange_rates(&mut conn)?; + pub fn get_exchange_rates( + &self, + conn: &mut SqliteConnection, + ) -> Result, Box> { + let rates = FxRepository::get_exchange_rates(conn)?; Ok(rates) } @@ -125,21 +119,19 @@ impl CurrencyExchangeService { pub fn add_exchange_rate( &self, + conn: &mut SqliteConnection, from: String, to: String, ) -> Result> { - let mut conn = self.pool.get()?; - // Check for direct conversion let direct_id = format!("{}{}=X", from, to); - if let Some(existing_rate) = FxRepository::get_exchange_rate_by_id(&mut conn, &direct_id)? { + if let Some(existing_rate) = FxRepository::get_exchange_rate_by_id(conn, &direct_id)? { return Ok(existing_rate); } // Check for inverse conversion let inverse_id = format!("{}{}=X", to, from); - if let Some(existing_rate) = FxRepository::get_exchange_rate_by_id(&mut conn, &inverse_id)? - { + if let Some(existing_rate) = FxRepository::get_exchange_rate_by_id(conn, &inverse_id)? { return Ok(existing_rate); } @@ -154,8 +146,8 @@ impl CurrencyExchangeService { updated_at: chrono::Utc::now().naive_utc(), }; - let result = self.upsert_exchange_rate(&mut conn, exchange_rate)?; - self.cache_rate(&result.id, result.rate)?; + let result = self.upsert_exchange_rate(conn, exchange_rate)?; + // self.cache_rate(&result.id, result.rate)?; Ok(result) } } diff --git a/src-core/src/market_data/market_data_service.rs b/src-core/src/market_data/market_data_service.rs index d57f2df..8699883 100644 --- a/src-core/src/market_data/market_data_service.rs +++ b/src-core/src/market_data/market_data_service.rs @@ -3,23 +3,20 @@ use crate::providers::yahoo_provider::YahooProvider; use crate::schema::{activities, exchange_rates, quotes}; use chrono::{Duration, NaiveDate, NaiveDateTime, TimeZone, Utc}; use diesel::prelude::*; -use diesel::r2d2::{ConnectionManager, Pool}; use diesel::SqliteConnection; use std::collections::HashMap; use std::time::SystemTime; pub struct MarketDataService { provider: YahooProvider, - pool: Pool>, } impl MarketDataService { - pub async fn new(pool: Pool>) -> Self { + pub async fn new() -> Self { MarketDataService { provider: YahooProvider::new() .await .expect("Failed to initialize YahooProvider"), - pool, } } @@ -30,22 +27,26 @@ impl MarketDataService { .map_err(|e| e.to_string()) } - pub fn get_latest_quote(&self, symbol: &str) -> QueryResult { - let mut conn = self.pool.get().expect("Couldn't get db connection"); + pub fn get_latest_quote( + &self, + conn: &mut SqliteConnection, + symbol: &str, + ) -> QueryResult { quotes::table .filter(quotes::symbol.eq(symbol)) .order(quotes::date.desc()) - .first::(&mut conn) + .first::(conn) } - pub fn get_history_quotes(&self) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - quotes::table.load::(&mut conn) + pub fn get_history_quotes( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + quotes::table.load::(conn) } - pub fn load_quotes(&self) -> HashMap<(String, NaiveDate), Quote> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - let quotes_result: QueryResult> = quotes::table.load::(&mut conn); + pub fn load_quotes(&self, conn: &mut SqliteConnection) -> HashMap<(String, NaiveDate), Quote> { + let quotes_result: QueryResult> = quotes::table.load::(conn); match quotes_result { Ok(quotes) => quotes @@ -62,14 +63,18 @@ impl MarketDataService { } } - pub async fn sync_quotes(&self, symbols: &[String]) -> Result<(), String> { + pub async fn sync_quotes( + &self, + conn: &mut SqliteConnection, + symbols: &[String], + ) -> Result<(), String> { println!("Syncing history quotes for all assets..."); let end_date = SystemTime::now(); let mut all_quotes_to_insert = Vec::new(); for symbol in symbols { let last_sync_date = self - .get_last_quote_sync_date(symbol) + .get_last_quote_sync_date(conn, symbol) .map_err(|e| format!("Error getting last sync date for {}: {}", symbol, e))? .unwrap_or_else(|| Utc::now().naive_utc() - Duration::days(3 * 365)); @@ -87,45 +92,46 @@ impl MarketDataService { } } - self.insert_quotes(&all_quotes_to_insert) + self.insert_quotes(conn, &all_quotes_to_insert) } fn get_last_quote_sync_date( &self, + conn: &mut SqliteConnection, ticker: &str, ) -> Result, diesel::result::Error> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); - quotes::table .filter(quotes::symbol.eq(ticker)) .select(diesel::dsl::max(quotes::date)) - .first::>(&mut conn) + .first::>(conn) .or_else(|_| { activities::table .filter(activities::asset_id.eq(ticker)) .select(diesel::dsl::min(activities::activity_date)) - .first::>(&mut conn) + .first::>(conn) }) } - fn insert_quotes(&self, quotes: &[Quote]) -> Result<(), String> { - let mut conn = self.pool.get().expect("Couldn't get db connection"); + fn insert_quotes(&self, conn: &mut SqliteConnection, quotes: &[Quote]) -> Result<(), String> { diesel::replace_into(quotes::table) .values(quotes) - .execute(&mut conn) + .execute(conn) .map_err(|e| format!("Failed to insert quotes: {}", e))?; Ok(()) } - pub async fn initialize_and_sync_quotes(&self) -> Result<(), String> { + pub async fn initialize_and_sync_quotes( + &self, + conn: &mut SqliteConnection, + ) -> Result<(), String> { use crate::schema::assets::dsl::*; - let conn = &mut self.pool.get().map_err(|e| e.to_string())?; - self.sync_exchange_rates().await?; + let asset_list: Vec = assets .load::(conn) .map_err(|e| format!("Failed to load assets: {}", e))?; self.sync_quotes( + conn, &asset_list .iter() .map(|asset| asset.symbol.clone()) @@ -143,15 +149,17 @@ impl MarketDataService { .map_err(|e| e.to_string()) } - pub fn get_asset_currencies(&self, asset_ids: Vec) -> HashMap { + pub fn get_asset_currencies( + &self, + conn: &mut SqliteConnection, + asset_ids: Vec, + ) -> HashMap { use crate::schema::assets::dsl::*; - let db_connection = &mut self.pool.get().expect("Couldn't get db connection"); - assets .filter(id.eq_any(asset_ids)) .select((id, currency)) - .load::<(String, String)>(db_connection) + .load::<(String, String)>(conn) .map(|results| results.into_iter().collect::>()) .unwrap_or_else(|e| { eprintln!("Error fetching asset currencies: {}", e); @@ -159,13 +167,12 @@ impl MarketDataService { }) } - pub async fn sync_exchange_rates(&self) -> Result<(), String> { + pub async fn sync_exchange_rates(&self, conn: &mut SqliteConnection) -> Result<(), String> { println!("Syncing exchange rates..."); - let mut conn = self.pool.get().expect("Couldn't get db connection"); // Load existing exchange rates let existing_rates: Vec = exchange_rates::table - .load::(&mut conn) + .load::(conn) .map_err(|e| format!("Failed to load existing exchange rates: {}", e))?; let mut updated_rates = Vec::new(); @@ -200,7 +207,7 @@ impl MarketDataService { // Update rates in the database diesel::replace_into(exchange_rates::table) .values(&updated_rates) - .execute(&mut conn) + .execute(conn) .map_err(|e| format!("Failed to update exchange rates: {}", e))?; Ok(()) diff --git a/src-core/src/portfolio/history_service.rs b/src-core/src/portfolio/history_service.rs index 0e904b2..6118e7c 100644 --- a/src-core/src/portfolio/history_service.rs +++ b/src-core/src/portfolio/history_service.rs @@ -6,58 +6,54 @@ use chrono::{Duration, NaiveDate, Utc}; use dashmap::DashMap; use diesel::prelude::*; -use diesel::r2d2::{ConnectionManager, Pool}; - use diesel::SqliteConnection; use rayon::prelude::*; use std::collections::HashMap; use std::sync::Arc; pub struct HistoryService { - pool: Pool>, base_currency: String, market_data_service: Arc, fx_service: CurrencyExchangeService, } impl HistoryService { - pub fn new( - pool: Pool>, - base_currency: String, - market_data_service: Arc, - ) -> Self { + pub fn new(base_currency: String, market_data_service: Arc) -> Self { Self { - pool: pool.clone(), base_currency, market_data_service, - fx_service: CurrencyExchangeService::new(pool), + fx_service: CurrencyExchangeService::new(), } } - pub fn get_account_history(&self, input_account_id: &str) -> Result> { + pub fn get_account_history( + &self, + conn: &mut SqliteConnection, + input_account_id: &str, + ) -> Result> { use crate::schema::portfolio_history::dsl::*; use diesel::prelude::*; - let db_connection = &mut self.pool.get().map_err(PortfolioError::from)?; - let history_data: Vec = portfolio_history .filter(account_id.eq(input_account_id)) .order(date.asc()) - .load::(db_connection)?; + .load::(conn)?; Ok(history_data) } - pub fn get_latest_account_history(&self, input_account_id: &str) -> Result { + pub fn get_latest_account_history( + &self, + conn: &mut SqliteConnection, + input_account_id: &str, + ) -> Result { use crate::schema::portfolio_history::dsl::*; use diesel::prelude::*; - let db_connection = &mut self.pool.get().map_err(PortfolioError::from)?; - let latest_history: PortfolioHistory = portfolio_history .filter(account_id.eq(input_account_id)) .order(date.desc()) - .first(db_connection) + .first(conn) .map_err(|e| PortfolioError::DatabaseError(e))?; Ok(latest_history) @@ -65,22 +61,77 @@ impl HistoryService { pub fn calculate_historical_data( &self, + conn: &mut SqliteConnection, accounts: &[Account], activities: &[Activity], force_full_calculation: bool, ) -> Result> { + self.fx_service + .initialize(conn) + .map_err(|e| PortfolioError::CurrencyConversionError(e.to_string()))?; + let end_date = Utc::now().naive_utc().date(); - let quotes = Arc::new(self.market_data_service.load_quotes()); + let quotes = Arc::new(self.market_data_service.load_quotes(conn)); + + // Preload all necessary data + let account_activities: HashMap> = + activities + .iter() + .cloned() + .fold(HashMap::new(), |mut acc, activity| { + acc.entry(activity.account_id.clone()) + .or_default() + .push(activity); + acc + }); + + let last_historical_dates: HashMap> = accounts + .iter() + .map(|account| { + let last_date = self + .get_last_historical_date(conn, &account.id) + .unwrap_or(None); + (account.id.clone(), last_date) + }) + .collect(); + + let account_currencies: HashMap = accounts + .iter() + .map(|account| { + let currency = self + .get_account_currency(conn, &account.id) + .unwrap_or(self.base_currency.clone()); + (account.id.clone(), currency) + }) + .collect(); + + // Load asset currencies + let asset_ids: Vec = activities.iter().map(|a| a.asset_id.clone()).collect(); + let asset_currencies = self + .market_data_service + .get_asset_currencies(conn, asset_ids); // Process accounts in parallel and collect results + let last_histories: HashMap> = accounts + .iter() + .map(|account| { + let last_history = if force_full_calculation { + None + } else { + self.get_last_portfolio_history(conn, &account.id) + .unwrap_or(None) + }; + (account.id.clone(), last_history) + }) + .collect(); + let summaries_and_histories: Vec<(HistorySummary, Vec)> = accounts .par_iter() .map(|account| { - let account_activities: Vec<_> = activities - .iter() - .filter(|a| a.account_id == account.id) + let account_activities = account_activities + .get(&account.id) .cloned() - .collect(); + .unwrap_or_default(); if account_activities.is_empty() { return ( @@ -101,7 +152,9 @@ impl HistoryService { .min() .unwrap_or_else(|| Utc::now().naive_utc().date()) } else { - self.get_last_historical_date(&account.id) + last_historical_dates + .get(&account.id) + .cloned() .unwrap_or(None) .map(|d| d - Duration::days(1)) .unwrap_or_else(|| { @@ -113,12 +166,22 @@ impl HistoryService { }) }; + let account_currency = account_currencies + .get(&account.id) + .cloned() + .unwrap_or(self.base_currency.clone()); + + let last_history = last_histories.get(&account.id).cloned().unwrap_or(None); + let new_history = self.calculate_historical_value( &account.id, &account_activities, "es, account_start_date, end_date, + account_currency, + &asset_currencies, + last_history, force_full_calculation, ); @@ -150,14 +213,13 @@ impl HistoryService { .collect(); // Save account histories - let db_connection = &mut self.pool.get().map_err(PortfolioError::from)?; - self.save_historical_data(&account_histories, db_connection)?; + self.save_historical_data(&account_histories, conn)?; // Calculate total portfolio history - let total_history = self.calculate_total_portfolio_history_for_all_accounts()?; + let total_history = self.calculate_total_portfolio_history_for_all_accounts(conn)?; // Save total history separately - self.save_historical_data(&total_history, db_connection)?; + self.save_historical_data(&total_history, conn)?; let total_summary = HistorySummary { id: Some("TOTAL".to_string()), @@ -176,23 +238,24 @@ impl HistoryService { Ok(summaries) } - // New method to calculate total portfolio history for all accounts - fn calculate_total_portfolio_history_for_all_accounts(&self) -> Result> { + fn calculate_total_portfolio_history_for_all_accounts( + &self, + conn: &mut SqliteConnection, + ) -> Result> { use crate::schema::accounts::dsl as accounts_dsl; use crate::schema::portfolio_history::dsl::*; - let db_connection = &mut self.pool.get().map_err(PortfolioError::from)?; // Get active account IDs let active_account_ids: Vec = accounts_dsl::accounts .filter(accounts_dsl::is_active.eq(true)) .select(accounts_dsl::id) - .load::(db_connection)?; + .load::(conn)?; let all_histories: Vec = portfolio_history .filter(account_id.ne("TOTAL")) .filter(account_id.eq_any(active_account_ids)) .order(date.asc()) - .load::(db_connection)?; + .load::(conn)?; let grouped_histories: HashMap> = all_histories .into_iter() @@ -266,6 +329,9 @@ impl HistoryService { quotes: &HashMap<(String, NaiveDate), Quote>, start_date: NaiveDate, end_date: NaiveDate, + account_currency: String, + asset_currencies: &HashMap, + last_history: Option, force_full_calculation: bool, ) -> Vec { let max_history_days = 36500; // For example, 100 years @@ -273,16 +339,6 @@ impl HistoryService { let start_date = start_date.max(today - Duration::days(max_history_days)); let end_date = end_date.min(today); - let last_history = if force_full_calculation { - None - } else { - self.get_last_portfolio_history(account_id).unwrap_or(None) - }; - - let account_currency = self - .get_account_currency(account_id) - .unwrap_or(self.base_currency.clone()); - // Initialize values from the last PortfolioHistory or use default values let mut cumulative_cash = last_history.as_ref().map_or(0.0, |h| h.available_cash); let mut net_deposit = last_history.as_ref().map_or(0.0, |h| h.net_deposit); @@ -333,6 +389,7 @@ impl HistoryService { quotes, date, "e_cache, + asset_currencies, &account_currency, ); @@ -434,7 +491,7 @@ impl HistoryService { fn save_historical_data( &self, history_data: &[PortfolioHistory], - db_connection: &mut SqliteConnection, + conn: &mut SqliteConnection, ) -> Result<()> { use crate::schema::portfolio_history::dsl::*; @@ -477,7 +534,7 @@ impl HistoryService { exchange_rate.eq(record.exchange_rate), holdings.eq(&record.holdings), )) - .execute(db_connection) + .execute(conn) .map_err(PortfolioError::from)?; } @@ -490,17 +547,13 @@ impl HistoryService { quotes: &'a HashMap<(String, NaiveDate), Quote>, date: NaiveDate, quote_cache: &DashMap<(String, NaiveDate), Option<&'a Quote>>, + asset_currencies: &HashMap, account_currency: &str, ) -> (f64, f64, f64) { let mut holdings_value = 0.0; let mut day_gain_value = 0.0; let mut opening_market_value = 0.0; - // Fetch all asset currencies at once - let asset_currencies = self - .market_data_service - .get_asset_currencies(holdings.keys().cloned().collect()); - for (asset_id, &quantity) in holdings { if let Some(quote) = self.get_last_available_quote(asset_id, date, quotes, quote_cache) { @@ -523,10 +576,10 @@ impl HistoryService { day_gain_value += day_gain; opening_market_value += opening_value; } else { - println!( - "Warning: No quote found for asset {} on date {}", - asset_id, date - ); + // println!( + // "Warning: No quote found for asset {} on date {}", + // asset_id, date + // ); } } @@ -555,15 +608,15 @@ impl HistoryService { fn get_last_portfolio_history( &self, + conn: &mut SqliteConnection, some_account_id: &str, ) -> Result> { use crate::schema::portfolio_history::dsl::*; - let db_connection = &mut self.pool.get().map_err(PortfolioError::from)?; let last_history_opt = portfolio_history .filter(account_id.eq(some_account_id)) .order(date.desc()) - .first::(db_connection) + .first::(conn) .optional() .map_err(PortfolioError::from)?; @@ -586,15 +639,18 @@ impl HistoryService { days } - fn get_last_historical_date(&self, some_account_id: &str) -> Result> { + fn get_last_historical_date( + &self, + conn: &mut SqliteConnection, + some_account_id: &str, + ) -> Result> { use crate::schema::portfolio_history::dsl::*; - let db_connection = &mut self.pool.get().map_err(PortfolioError::from)?; let last_date_opt = portfolio_history .filter(account_id.eq(some_account_id)) .select(date) .order(date.desc()) - .first::(db_connection) + .first::(conn) .optional() .map_err(PortfolioError::from)?; @@ -608,14 +664,17 @@ impl HistoryService { } // Add this new method to get account currency - fn get_account_currency(&self, account_id: &str) -> Result { + fn get_account_currency( + &self, + conn: &mut SqliteConnection, + account_id: &str, + ) -> Result { use crate::schema::accounts::dsl::*; - let db_connection = &mut self.pool.get().map_err(PortfolioError::from)?; accounts .filter(id.eq(account_id)) .select(currency) - .first::(db_connection) + .first::(conn) .map_err(PortfolioError::from) } } diff --git a/src-core/src/portfolio/holdings_service.rs b/src-core/src/portfolio/holdings_service.rs index ad0b8b1..e071b9f 100644 --- a/src-core/src/portfolio/holdings_service.rs +++ b/src-core/src/portfolio/holdings_service.rs @@ -4,7 +4,6 @@ use crate::asset::asset_service::AssetService; use crate::error::{PortfolioError, Result}; use crate::fx::fx_service::CurrencyExchangeService; use crate::models::{Holding, Performance}; -use diesel::r2d2::{ConnectionManager, Pool}; use diesel::SqliteConnection; use std::collections::{HashMap, HashSet}; @@ -17,25 +16,25 @@ pub struct HoldingsService { } impl HoldingsService { - pub async fn new( - pool: Pool>, - base_currency: String, - ) -> Self { + pub async fn new(base_currency: String) -> Self { HoldingsService { - account_service: AccountService::new(pool.clone(), base_currency.clone()), - activity_service: ActivityService::new(pool.clone(), base_currency.clone()), - asset_service: AssetService::new(pool.clone()).await, - fx_service: CurrencyExchangeService::new(pool.clone()), + account_service: AccountService::new(base_currency.clone()), + activity_service: ActivityService::new(base_currency.clone()), + asset_service: AssetService::new().await, + fx_service: CurrencyExchangeService::new(), base_currency, } } - pub fn compute_holdings(&self) -> Result> { - let start_time = std::time::Instant::now(); + pub fn compute_holdings(&self, conn: &mut SqliteConnection) -> Result> { + let start_time = std::time::Instant::now(); let mut holdings: HashMap = HashMap::new(); - let accounts = self.account_service.get_accounts()?; - let activities = self.activity_service.get_trading_activities()?; - let assets = self.asset_service.get_assets()?; + let accounts = self.account_service.get_accounts(conn)?; + let activities = self.activity_service.get_trading_activities(conn)?; + let assets = self.asset_service.get_assets(conn)?; + self.fx_service + .initialize(conn) + .map_err(|e| PortfolioError::CurrencyConversionError(e.to_string()))?; println!( "Found {} accounts, {} activities, and {} assets", @@ -112,7 +111,7 @@ impl HoldingsService { // Fetch quotes for each symbol asynchronously let mut quotes = HashMap::new(); for symbol in symbols { - match self.asset_service.get_latest_quote(&symbol) { + match self.asset_service.get_latest_quote(conn, &symbol) { Ok(quote) => { quotes.insert(symbol.clone(), quote); } diff --git a/src-core/src/portfolio/income_service.rs b/src-core/src/portfolio/income_service.rs index bfd3902..d082c5c 100644 --- a/src-core/src/portfolio/income_service.rs +++ b/src-core/src/portfolio/income_service.rs @@ -2,32 +2,26 @@ use crate::fx::fx_service::CurrencyExchangeService; use crate::models::{IncomeData, IncomeSummary}; use chrono::{Datelike, NaiveDateTime}; use diesel::prelude::*; -use diesel::r2d2::{ConnectionManager, Pool}; -use diesel::SqliteConnection; use std::collections::HashMap; pub struct IncomeService { fx_service: CurrencyExchangeService, base_currency: String, - pool: Pool>, } impl IncomeService { - pub fn new( - pool: Pool>, - fx_service: CurrencyExchangeService, - base_currency: String, - ) -> Self { + pub fn new(fx_service: CurrencyExchangeService, base_currency: String) -> Self { IncomeService { fx_service, base_currency, - pool, } } - pub fn get_income_data(&self) -> Result, diesel::result::Error> { + pub fn get_income_data( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { use crate::schema::activities; - let mut conn = self.pool.get().expect("Couldn't get db connection"); activities::table .filter(activities::activity_type.eq_any(vec!["DIVIDEND", "INTEREST"])) .select(( @@ -37,7 +31,7 @@ impl IncomeService { activities::quantity * activities::unit_price, activities::currency, )) - .load::<(NaiveDateTime, String, String, f64, String)>(&mut conn) + .load::<(NaiveDateTime, String, String, f64, String)>(conn) .map(|results| { results .into_iter() @@ -52,8 +46,11 @@ impl IncomeService { }) } - pub fn get_income_summary(&self) -> Result { - let income_data = self.get_income_data()?; + pub fn get_income_summary( + &self, + conn: &mut SqliteConnection, + ) -> Result { + let income_data = self.get_income_data(conn)?; let base_currency = self.base_currency.clone(); let mut by_month: HashMap = HashMap::new(); diff --git a/src-core/src/portfolio/portfolio_service.rs b/src-core/src/portfolio/portfolio_service.rs index 98f3e99..43d1871 100644 --- a/src-core/src/portfolio/portfolio_service.rs +++ b/src-core/src/portfolio/portfolio_service.rs @@ -6,8 +6,7 @@ use crate::models::{ AccountSummary, HistorySummary, Holding, IncomeData, IncomeSummary, PortfolioHistory, }; -use diesel::r2d2::{ConnectionManager, Pool}; -use diesel::SqliteConnection; +use diesel::prelude::*; use std::sync::Arc; @@ -25,109 +24,137 @@ pub struct PortfolioService { } impl PortfolioService { - pub async fn new( - pool: Pool>, - base_currency: String, - ) -> Result> { - let market_data_service = Arc::new(MarketDataService::new(pool.clone()).await); + pub async fn new(base_currency: String) -> Result> { + let market_data_service = Arc::new(MarketDataService::new().await); Ok(PortfolioService { - account_service: AccountService::new(pool.clone(), base_currency.clone()), - activity_service: ActivityService::new(pool.clone(), base_currency.clone()), + account_service: AccountService::new(base_currency.clone()), + activity_service: ActivityService::new(base_currency.clone()), market_data_service: market_data_service.clone(), income_service: IncomeService::new( - pool.clone(), - CurrencyExchangeService::new(pool.clone()), + CurrencyExchangeService::new(), base_currency.clone(), ), - holdings_service: HoldingsService::new(pool.clone(), base_currency.clone()).await, - history_service: HistoryService::new( - pool.clone(), - base_currency.clone(), - market_data_service, - ), + holdings_service: HoldingsService::new(base_currency.clone()).await, + history_service: HistoryService::new(base_currency.clone(), market_data_service), }) } - pub async fn compute_holdings(&self) -> Result, Box> { + pub async fn compute_holdings( + &self, + conn: &mut SqliteConnection, + ) -> Result, Box> { self.holdings_service - .compute_holdings() + .compute_holdings(conn) .map_err(|e| Box::new(e) as Box) } pub async fn calculate_historical_data( &self, + conn: &mut SqliteConnection, account_ids: Option>, force_full_calculation: bool, ) -> Result, Box> { // First, sync quotes - self.market_data_service.sync_exchange_rates().await?; + self.market_data_service.sync_exchange_rates(conn).await?; + let accounts = match &account_ids { - Some(ids) => self.account_service.get_accounts_by_ids(ids)?, - None => self.account_service.get_accounts()?, + Some(ids) => self.account_service.get_accounts_by_ids(conn, ids)?, + None => self.account_service.get_accounts(conn)?, }; let activities = match &account_ids { - Some(ids) => self.activity_service.get_activities_by_account_ids(ids)?, - None => self.activity_service.get_activities()?, + Some(ids) => self + .activity_service + .get_activities_by_account_ids(conn, ids)?, + None => self.activity_service.get_activities(conn)?, }; - let results = self.history_service.calculate_historical_data( - &accounts, - &activities, - force_full_calculation, - )?; + let results = conn.transaction(|conn| { + self.history_service.calculate_historical_data( + conn, + &accounts, + &activities, + force_full_calculation, + ) + })?; Ok(results) } - pub fn get_income_data(&self) -> Result, diesel::result::Error> { - self.income_service.get_income_data() + pub fn get_income_data( + &self, + conn: &mut SqliteConnection, + ) -> Result, diesel::result::Error> { + self.income_service.get_income_data(conn) } - pub fn get_income_summary(&self) -> Result { - self.income_service.get_income_summary() + pub fn get_income_summary( + &self, + conn: &mut SqliteConnection, + ) -> Result { + self.income_service.get_income_summary(conn) } pub async fn update_portfolio( &self, + conn: &mut SqliteConnection, ) -> Result, Box> { + use std::time::Instant; + let start = Instant::now(); + // First, sync quotes + println!("initialize_and_sync_quotes"); self.market_data_service - .initialize_and_sync_quotes() + .initialize_and_sync_quotes(conn) .await?; // Then, calculate historical data - self.calculate_historical_data(None, false).await + println!("calculate_historical_data"); + let result = self.calculate_historical_data(conn, None, false).await; + + let duration = start.elapsed(); + println!("update_portfolio completed in: {:?}", duration); + + result } pub fn get_account_history( &self, + conn: &mut SqliteConnection, account_id: &str, ) -> Result, Box> { self.history_service - .get_account_history(account_id) + .get_account_history(conn, account_id) .map_err(|e| Box::new(e) as Box) // Convert PortfolioError to Box } - pub fn get_accounts_summary(&self) -> Result, Box> { - let accounts = self.account_service.get_accounts()?; + pub fn get_accounts_summary( + &self, + conn: &mut SqliteConnection, + ) -> Result, Box> { + let accounts = self.account_service.get_accounts(conn)?; let mut account_summaries = Vec::new(); // First, get the total portfolio value - let total_portfolio_value = - if let Ok(total_history) = self.history_service.get_latest_account_history("TOTAL") { - total_history.market_value - } else { - return Err(Box::new(std::io::Error::new( - std::io::ErrorKind::NotFound, - "Total portfolio history not found", - ))); - }; + let total_portfolio_value = if let Ok(total_history) = self + .history_service + .get_latest_account_history(conn, "TOTAL") + { + total_history.market_value + } else { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Total portfolio history not found", + ))); + }; // Then, calculate the allocation percentage for each account for account in accounts { - if let Ok(history) = self.history_service.get_latest_account_history(&account.id) { + if let Ok(history) = self + .history_service + .get_latest_account_history(conn, &account.id) + { let allocation_percentage = if total_portfolio_value > 0.0 { (history.market_value / total_portfolio_value) * 100.0 } else { diff --git a/src-core/src/providers/yahoo_provider.rs b/src-core/src/providers/yahoo_provider.rs index 85c7f9f..4e2c32e 100644 --- a/src-core/src/providers/yahoo_provider.rs +++ b/src-core/src/providers/yahoo_provider.rs @@ -64,7 +64,7 @@ impl YahooProvider { pub async fn new() -> Result { let provider = yahoo::YahooConnector::new()?; let yahoo_provider = YahooProvider { provider }; - yahoo_provider.set_crumb().await?; + Ok(yahoo_provider) } @@ -154,6 +154,7 @@ impl YahooProvider { } pub async fn fetch_symbol_summary(&self, symbol: &str) -> Result { + self.set_crumb().await?; // Handle the cash asset case if let Some(currency) = symbol.strip_prefix("$CASH-") { return Ok(self.create_cash_asset(symbol, currency)); diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index b407981..56a7386 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -4449,7 +4449,7 @@ dependencies = [ [[package]] name = "wealthfolio-app" -version = "1.0.14" +version = "1.0.15" dependencies = [ "diesel", "dotenvy", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 2590318..df06927 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "wealthfolio-app" -version = "1.0.14" +version = "1.0.15" description = "Portfolio tracker" authors = ["Aziz Fadil"] license = "LGPL-3.0" diff --git a/src-tauri/src/commands/account.rs b/src-tauri/src/commands/account.rs index 9539d72..c0cdb8c 100644 --- a/src-tauri/src/commands/account.rs +++ b/src-tauri/src/commands/account.rs @@ -7,9 +7,13 @@ use tauri::State; pub async fn get_accounts(state: State<'_, AppState>) -> Result, String> { println!("Fetching active accounts..."); let base_currency = state.base_currency.read().unwrap().clone(); - let service = AccountService::new((*state.pool).clone(), base_currency); + let service = AccountService::new(base_currency); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; service - .get_accounts() + .get_accounts(&mut conn) .map_err(|e| format!("Failed to load accounts: {}", e)) } @@ -19,10 +23,14 @@ pub async fn create_account( state: State<'_, AppState>, ) -> Result { println!("Adding new account..."); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = AccountService::new((*state.pool).clone(), base_currency); + let service = AccountService::new(base_currency); service - .create_account(account) + .create_account(&mut conn, account) .await .map_err(|e| format!("Failed to add new account: {}", e)) } @@ -33,10 +41,14 @@ pub async fn update_account( state: State<'_, AppState>, ) -> Result { println!("Updating account..."); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = AccountService::new((*state.pool).clone(), base_currency); + let service = AccountService::new(base_currency); service - .update_account(account) + .update_account(&mut conn, account) .map_err(|e| format!("Failed to update account: {}", e)) } @@ -47,8 +59,12 @@ pub async fn delete_account( ) -> Result { println!("Deleting account..."); let base_currency = state.base_currency.read().unwrap().clone(); - let service = AccountService::new((*state.pool).clone(), base_currency); + let service = AccountService::new(base_currency); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; service - .delete_account(account_id) + .delete_account(&mut conn, account_id) .map_err(|e| format!("Failed to delete account: {}", e)) } diff --git a/src-tauri/src/commands/activity.rs b/src-tauri/src/commands/activity.rs index 5084c58..190134b 100644 --- a/src-tauri/src/commands/activity.rs +++ b/src-tauri/src/commands/activity.rs @@ -16,11 +16,16 @@ pub async fn search_activities( state: State<'_, AppState>, ) -> Result { println!("Search activities... {}, {}", page, page_size); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = activity_service::ActivityService::new((*state.pool).clone(), base_currency); + let service = activity_service::ActivityService::new(base_currency); service .search_activities( + &mut conn, page, page_size, account_id_filter, @@ -37,10 +42,14 @@ pub async fn create_activity( state: State<'_, AppState>, ) -> Result { println!("Adding new activity..."); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = activity_service::ActivityService::new((*state.pool).clone(), base_currency); + let service = activity_service::ActivityService::new(base_currency); service - .create_activity(activity) + .create_activity(&mut conn, activity) .await .map_err(|e| format!("Failed to add new activity: {}", e)) } @@ -51,10 +60,14 @@ pub async fn update_activity( state: State<'_, AppState>, ) -> Result { println!("Updating activity..."); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = activity_service::ActivityService::new((*state.pool).clone(), base_currency); + let service = activity_service::ActivityService::new(base_currency); service - .update_activity(activity) + .update_activity(&mut conn, activity) .await .map_err(|e| format!("Failed to update activity: {}", e)) } @@ -69,10 +82,14 @@ pub async fn check_activities_import( "Checking activities import...: {}, {}", account_id, file_path ); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = activity_service::ActivityService::new((*state.pool).clone(), base_currency); + let service = activity_service::ActivityService::new(base_currency); service - .check_activities_import(account_id, file_path) + .check_activities_import(&mut conn, account_id, file_path) .await .map_err(|e| e.to_string()) } @@ -83,10 +100,14 @@ pub async fn create_activities( state: State<'_, AppState>, ) -> Result { println!("Importing activities..."); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = activity_service::ActivityService::new((*state.pool).clone(), base_currency); + let service = activity_service::ActivityService::new(base_currency); service - .create_activities(activities) + .create_activities(&mut conn, activities) .map_err(|err| format!("Failed to import activities: {}", err)) } @@ -96,9 +117,13 @@ pub async fn delete_activity( state: State<'_, AppState>, ) -> Result { println!("Deleting activity..."); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; let base_currency = state.base_currency.read().unwrap().clone(); - let service = activity_service::ActivityService::new((*state.pool).clone(), base_currency); + let service = activity_service::ActivityService::new(base_currency); service - .delete_activity(activity_id) + .delete_activity(&mut conn, activity_id) .map_err(|e| format!("Failed to delete activity: {}", e)) } diff --git a/src-tauri/src/commands/market_data.rs b/src-tauri/src/commands/market_data.rs index ab859cb..e44a765 100644 --- a/src-tauri/src/commands/market_data.rs +++ b/src-tauri/src/commands/market_data.rs @@ -6,12 +6,10 @@ use crate::AppState; use tauri::State; #[tauri::command] -pub async fn search_symbol( - query: String, - state: State<'_, AppState>, -) -> Result, String> { +pub async fn search_symbol(query: String) -> Result, String> { println!("Searching for ticker symbol: {}", query); - let service = MarketDataService::new((*state.pool).clone()).await; + let service = MarketDataService::new().await; + service .search_symbol(&query) .await @@ -23,16 +21,26 @@ pub async fn get_asset_data( asset_id: String, state: State<'_, AppState>, ) -> Result { - let service = AssetService::new((*state.pool).clone()).await; - service.get_asset_data(&asset_id).map_err(|e| e.to_string()) + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; + let service = AssetService::new().await; + service + .get_asset_data(&mut conn, &asset_id) + .map_err(|e| e.to_string()) } #[tauri::command] pub async fn synch_quotes(state: State<'_, AppState>) -> Result<(), String> { println!("Synching quotes history"); - let service = MarketDataService::new((*state.pool).clone()).await; + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; + let service = MarketDataService::new().await; service - .initialize_and_sync_quotes() + .initialize_and_sync_quotes(&mut conn) .await .map_err(|e| e.to_string()) } diff --git a/src-tauri/src/commands/portfolio.rs b/src-tauri/src/commands/portfolio.rs index 77a23e8..16984c2 100644 --- a/src-tauri/src/commands/portfolio.rs +++ b/src-tauri/src/commands/portfolio.rs @@ -6,7 +6,7 @@ use tauri::State; async fn create_portfolio_service(state: &State<'_, AppState>) -> Result { let base_currency = state.base_currency.read().unwrap().clone(); - PortfolioService::new((*state.pool).clone(), base_currency) + PortfolioService::new(base_currency) .await .map_err(|e| format!("Failed to create PortfolioService: {}", e)) } @@ -19,23 +19,33 @@ pub async fn calculate_historical_data( ) -> Result, String> { println!("Calculate portfolio historical..."); let service = create_portfolio_service(&state).await?; + let mut conn = state.pool.get().map_err(|e| e.to_string())?; service - .calculate_historical_data(account_ids, force_full_calculation) + .calculate_historical_data(&mut conn, account_ids, force_full_calculation) .await .map_err(|e| e.to_string()) } #[tauri::command] pub async fn compute_holdings(state: State<'_, AppState>) -> Result, String> { + use std::time::Instant; println!("Compute holdings..."); + let start = Instant::now(); + let service = create_portfolio_service(&state).await?; + let mut conn = state.pool.get().map_err(|e| e.to_string())?; - service - .compute_holdings() + let result = service + .compute_holdings(&mut conn) .await .map_err(|e| e.to_string()) - .map(|vec| Ok(vec))? + .map(|vec| Ok(vec))?; + + let duration = start.elapsed(); + println!("Compute holdings completed in: {:?}", duration); + + result } #[tauri::command] @@ -45,9 +55,10 @@ pub async fn get_account_history( ) -> Result, String> { println!("Fetching account history for account ID: {}", account_id); let service = create_portfolio_service(&state).await?; + let mut conn = state.pool.get().map_err(|e| e.to_string())?; service - .get_account_history(&account_id) + .get_account_history(&mut conn, &account_id) .map_err(|e| format!("Failed to fetch account history: {}", e)) } @@ -57,9 +68,10 @@ pub async fn get_accounts_summary( ) -> Result, String> { println!("Fetching active accounts performance..."); let service = create_portfolio_service(&state).await?; + let mut conn = state.pool.get().map_err(|e| e.to_string())?; service - .get_accounts_summary() + .get_accounts_summary(&mut conn) .map_err(|e| format!("Failed to fetch active accounts performance: {}", e)) } @@ -69,9 +81,10 @@ pub async fn recalculate_portfolio( ) -> Result, String> { println!("Recalculating portfolio..."); let service = create_portfolio_service(&state).await?; + let mut conn = state.pool.get().map_err(|e| e.to_string())?; service - .update_portfolio() + .update_portfolio(&mut conn) .await .map_err(|e| format!("Failed to recalculate portfolio: {}", e)) } @@ -80,8 +93,9 @@ pub async fn recalculate_portfolio( pub async fn get_income_summary(state: State<'_, AppState>) -> Result { println!("Fetching income summary..."); let service = create_portfolio_service(&state).await?; + let mut conn = state.pool.get().map_err(|e| e.to_string())?; service - .get_income_summary() + .get_income_summary(&mut conn) .map_err(|e| format!("Failed to fetch income summary: {}", e)) } diff --git a/src-tauri/src/commands/settings.rs b/src-tauri/src/commands/settings.rs index e8541aa..6838c00 100644 --- a/src-tauri/src/commands/settings.rs +++ b/src-tauri/src/commands/settings.rs @@ -51,17 +51,25 @@ pub async fn update_exchange_rate( state: State<'_, AppState>, ) -> Result { println!("Updating exchange rate..."); - let fx_service = CurrencyExchangeService::new((*state.pool).clone()); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; + let fx_service = CurrencyExchangeService::new(); fx_service - .update_exchange_rate(&rate) + .update_exchange_rate(&mut conn, &rate) .map_err(|e| format!("Failed to update exchange rate: {}", e)) } #[tauri::command] pub async fn get_exchange_rates(state: State<'_, AppState>) -> Result, String> { println!("Fetching exchange rates..."); - let fx_service = CurrencyExchangeService::new((*state.pool).clone()); + let mut conn = state + .pool + .get() + .map_err(|e| format!("Failed to get connection: {}", e))?; + let fx_service = CurrencyExchangeService::new(); fx_service - .get_exchange_rates() + .get_exchange_rates(&mut conn) .map_err(|e| format!("Failed to load exchange rates: {}", e)) } diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index fbfdd64..68c1ea9 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -46,6 +46,7 @@ use tauri::{api::dialog, CustomMenuItem, Manager, Menu, Submenu}; type DbPool = r2d2::Pool>; // AppState +#[derive(Clone)] struct AppState { pool: Arc, base_currency: Arc>, @@ -68,6 +69,7 @@ fn main() { // Create connection pool let manager = ConnectionManager::::new(&db_path); let pool = r2d2::Pool::builder() + .max_size(5) .build(manager) .expect("Failed to create database connection pool"); let pool = Arc::new(pool); @@ -84,9 +86,9 @@ fn main() { pool: pool.clone(), base_currency: Arc::new(RwLock::new(base_currency)), }; - app.manage(state); + app.manage(state.clone()); - spawn_quote_sync(app_handle, pool); + spawn_quote_sync(app_handle, state); Ok(()) }) @@ -147,32 +149,43 @@ fn handle_menu_event(event: tauri::WindowMenuEvent) { } } -fn spawn_quote_sync(app_handle: tauri::AppHandle, pool: Arc) { +fn spawn_quote_sync(app_handle: tauri::AppHandle, state: AppState) { spawn(async move { let base_currency = { - let state = app_handle.state::(); let currency = state.base_currency.read().unwrap().clone(); currency }; - let portfolio_service = portfolio::PortfolioService::new((*pool).clone(), base_currency) - .await - .expect("Failed to create PortfolioService"); + let portfolio_service = match portfolio::PortfolioService::new(base_currency).await { + Ok(service) => service, + Err(e) => { + eprintln!("Failed to create PortfolioService: {}", e); + if let Err(emit_err) = app_handle.emit_all( + "PORTFOLIO_SERVICE_ERROR", + "Failed to initialize PortfolioService", + ) { + eprintln!("Failed to emit PORTFOLIO_SERVICE_ERROR event: {}", emit_err); + } + return; + } + }; app_handle .emit_all("PORTFOLIO_UPDATE_START", ()) .expect("Failed to emit event"); - match portfolio_service.update_portfolio().await { + let mut conn = state.pool.get().expect("Failed to get database connection"); + + match portfolio_service.update_portfolio(&mut conn).await { Ok(_) => { - app_handle - .emit_all("PORTFOLIO_UPDATE_COMPLETE", ()) - .expect("Failed to emit event"); + if let Err(e) = app_handle.emit_all("PORTFOLIO_UPDATE_COMPLETE", ()) { + eprintln!("Failed to emit PORTFOLIO_UPDATE_COMPLETE event: {}", e); + } } Err(e) => { eprintln!("Failed to update portfolio: {}", e); - app_handle - .emit_all("PORTFOLIO_UPDATE_ERROR", ()) - .expect("Failed to emit event"); + if let Err(e) = app_handle.emit_all("PORTFOLIO_UPDATE_ERROR", &e.to_string()) { + eprintln!("Failed to emit PORTFOLIO_UPDATE_ERROR event: {}", e); + } } } }); diff --git a/src-tauri/tauri.conf.json b/src-tauri/tauri.conf.json index 3ac741e..0ab1f5a 100644 --- a/src-tauri/tauri.conf.json +++ b/src-tauri/tauri.conf.json @@ -8,7 +8,7 @@ }, "package": { "productName": "Wealthfolio", - "version": "1.0.14" + "version": "1.0.15" }, "tauri": { "allowlist": { diff --git a/src/adapters/index.ts b/src/adapters/index.ts index 417d3f8..892afde 100644 --- a/src/adapters/index.ts +++ b/src/adapters/index.ts @@ -3,7 +3,7 @@ export enum RUN_ENV { MOBILE = 'mobile', BROWSER = 'browser', UNSUPPORTED = 'unsupported', -}; +} export const getRunEnv = (): RUN_ENV => { if (typeof window !== 'undefined' && window.__TAURI__) { @@ -13,12 +13,9 @@ export const getRunEnv = (): RUN_ENV => { return RUN_ENV.BROWSER; } return RUN_ENV.UNSUPPORTED; -} +}; -export type { - EventCallback, - UnlistenFn, -} from './tauri'; +export type { EventCallback, UnlistenFn } from './tauri'; export { invokeTauri, @@ -28,4 +25,5 @@ export { listenFileDropCancelledTauri, listenQuotesSyncStartTauri, listenQuotesSyncCompleteTauri, + listenQuotesSyncErrorTauri, } from './tauri'; diff --git a/src/adapters/tauri.ts b/src/adapters/tauri.ts index 39798a1..a86a19c 100644 --- a/src/adapters/tauri.ts +++ b/src/adapters/tauri.ts @@ -40,3 +40,9 @@ export const listenQuotesSyncCompleteTauri = async ( ): Promise => { return listen('PORTFOLIO_UPDATE_COMPLETE', handler); }; + +export const listenQuotesSyncErrorTauri = async ( + handler: EventCallback, +): Promise => { + return listen('PORTFOLIO_UPDATE_ERROR', handler); +}; diff --git a/src/commands/quote-listener.ts b/src/commands/quote-listener.ts index 03ac53d..5a1a6a9 100644 --- a/src/commands/quote-listener.ts +++ b/src/commands/quote-listener.ts @@ -4,6 +4,7 @@ import { RUN_ENV, listenQuotesSyncStartTauri, listenQuotesSyncCompleteTauri, + listenQuotesSyncErrorTauri, } from '@/adapters'; // listenQuotesSyncStart @@ -37,3 +38,18 @@ export const listenQuotesSyncComplete = async ( throw error; } }; + +// listenQuotesSyncError +export const listenQuotesSyncError = async (handler: EventCallback): Promise => { + try { + switch (getRunEnv()) { + case RUN_ENV.DESKTOP: + return listenQuotesSyncErrorTauri(handler); + default: + throw new Error(`Unsupported`); + } + } catch (error) { + console.error('Error listen PORTFOLIO_UPDATE_ERROR:', error); + throw error; + } +}; diff --git a/src/hooks/useCalculateHistory.ts b/src/hooks/useCalculateHistory.ts index 8f2c46b..18c85d1 100644 --- a/src/hooks/useCalculateHistory.ts +++ b/src/hooks/useCalculateHistory.ts @@ -24,8 +24,9 @@ export function useCalculateHistoryMutation({ variant: 'success', }); }, - onError: () => { + onError: (error) => { queryClient.invalidateQueries(); + console.error(error); toast({ title: errorTitle, description: 'Please try again or report an issue if the problem persists.', diff --git a/src/pages/dashboard/dashboard-page.tsx b/src/pages/dashboard/dashboard-page.tsx index eeaff3c..12bf2c8 100644 --- a/src/pages/dashboard/dashboard-page.tsx +++ b/src/pages/dashboard/dashboard-page.tsx @@ -27,6 +27,11 @@ function DashboardSkeleton() { } export default function DashboardPage() { + const { data: accounts, isLoading: isAccountsLoading } = useQuery({ + queryKey: [QueryKeys.ACCOUNTS_SUMMARY], + queryFn: getAccountsSummary, + }); + const { data: portfolioHistory, isLoading: isPortfolioHistoryLoading } = useQuery< PortfolioHistory[], Error @@ -35,11 +40,6 @@ export default function DashboardPage() { queryFn: () => getAccountHistory('TOTAL'), }); - const { data: accounts, isLoading: isAccountsLoading } = useQuery({ - queryKey: [QueryKeys.ACCOUNTS_SUMMARY], - queryFn: getAccountsSummary, - }); - if (isPortfolioHistoryLoading || isAccountsLoading) { return ; } diff --git a/src/useGlobalEventListener.ts b/src/useGlobalEventListener.ts index 36965f5..6c9c4d3 100644 --- a/src/useGlobalEventListener.ts +++ b/src/useGlobalEventListener.ts @@ -2,7 +2,11 @@ import { useEffect } from 'react'; import { useQueryClient } from '@tanstack/react-query'; import { toast } from '@/components/ui/use-toast'; -import { listenQuotesSyncComplete, listenQuotesSyncStart } from '@/commands/quote-listener'; +import { + listenQuotesSyncComplete, + listenQuotesSyncStart, + listenQuotesSyncError, +} from '@/commands/quote-listener'; const useGlobalEventListener = () => { const queryClient = useQueryClient(); @@ -24,13 +28,26 @@ const useGlobalEventListener = () => { duration: 5000, }); }; + + const handleQuotesSyncError = (error: string) => { + toast({ + title: 'Portfolio Update Error', + description: error, + duration: 5000, + variant: 'destructive', + }); + }; const setupListeners = async () => { const unlistenSyncStart = await listenQuotesSyncStart(handleQuoteSyncStart); const unlistenSyncComplete = await listenQuotesSyncComplete(handleQuotesSyncComplete); + const unlistenSyncError = await listenQuotesSyncError((event) => { + handleQuotesSyncError(event.payload as string); + }); return () => { unlistenSyncStart(); unlistenSyncComplete(); + unlistenSyncError(); }; };