diff --git a/package-lock.json b/package-lock.json index 9a0f782b0..47ef4699a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,17 +14,17 @@ "@chakra-ui/theme-tools": "^2.2.6", "@emotion/react": "^11.11.4", "@emotion/styled": "^11.11.5", - "@tauri-apps/api": "^2.7.0", + "@tauri-apps/api": "^2.8.0", "@tauri-apps/plugin-clipboard-manager": "^2.3.0", - "@tauri-apps/plugin-deep-link": "^2.4.0", - "@tauri-apps/plugin-dialog": "^2.3.0", - "@tauri-apps/plugin-fs": "^2.4.0", - "@tauri-apps/plugin-http": "^2.5.0", + "@tauri-apps/plugin-deep-link": "^2.4.1", + "@tauri-apps/plugin-dialog": "^2.3.3", + "@tauri-apps/plugin-fs": "^2.4.2", + "@tauri-apps/plugin-http": "^2.5.2", "@tauri-apps/plugin-log": "^2.6.0", - "@tauri-apps/plugin-opener": "^2.4.0", - "@tauri-apps/plugin-os": "^2.3.0", + "@tauri-apps/plugin-opener": "^2.5.0", + "@tauri-apps/plugin-os": "^2.3.1", "@tauri-apps/plugin-process": "^2.3.0", - "@tauri-apps/plugin-window-state": "^2.3.0", + "@tauri-apps/plugin-window-state": "^2.4.0", "i18next": "^24.0.0", "lodash": "^4.17.21", "masonic": "^4.0.1", @@ -40,7 +40,7 @@ }, "devDependencies": { "@next/bundle-analyzer": "^15.1.2", - "@tauri-apps/cli": "^2.7.0", + "@tauri-apps/cli": "^2.8.1", "@trivago/prettier-plugin-sort-imports": "^4.3.0", "@types/lodash": "^4.17.15", "@types/micromatch": "^4.0.9", diff --git a/package.json b/package.json index 53f009ef2..b89d638c9 100644 --- a/package.json +++ b/package.json @@ -35,17 +35,17 @@ "@chakra-ui/theme-tools": "^2.2.6", "@emotion/react": "^11.11.4", "@emotion/styled": "^11.11.5", - "@tauri-apps/api": "^2.7.0", + "@tauri-apps/api": "^2.8.0", "@tauri-apps/plugin-clipboard-manager": "^2.3.0", - "@tauri-apps/plugin-deep-link": "^2.4.0", - "@tauri-apps/plugin-dialog": "^2.3.0", - "@tauri-apps/plugin-fs": "^2.4.0", - "@tauri-apps/plugin-http": "^2.5.0", + "@tauri-apps/plugin-deep-link": "^2.4.1", + "@tauri-apps/plugin-dialog": "^2.3.3", + "@tauri-apps/plugin-fs": "^2.4.2", + "@tauri-apps/plugin-http": "^2.5.2", "@tauri-apps/plugin-log": "^2.6.0", - "@tauri-apps/plugin-opener": "^2.4.0", - "@tauri-apps/plugin-os": "^2.3.0", + "@tauri-apps/plugin-opener": "^2.5.0", + "@tauri-apps/plugin-os": "^2.3.1", "@tauri-apps/plugin-process": "^2.3.0", - "@tauri-apps/plugin-window-state": "^2.3.0", + "@tauri-apps/plugin-window-state": "^2.4.0", "i18next": "^24.0.0", "lodash": "^4.17.21", "masonic": "^4.0.1", @@ -61,7 +61,7 @@ }, "devDependencies": { "@next/bundle-analyzer": "^15.1.2", - "@tauri-apps/cli": "^2.7.0", + "@tauri-apps/cli": "^2.8.1", "@trivago/prettier-plugin-sort-imports": "^4.3.0", "@types/lodash": "^4.17.15", "@types/micromatch": "^4.0.9", diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index ad84981fa..86a64bcd4 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -56,6 +56,7 @@ dependencies = [ "tauri-plugin-single-instance", "tauri-plugin-window-state", "tauri-utils", + "thiserror 1.0.69", "tokio", "tokio-util", "toml 0.8.23", diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 0322ed79f..ad1e53afd 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -78,6 +78,7 @@ font-loader = "0.11.0" async-speed-limit = "0.4.2" reqwest-middleware = "0.4.2" reqwest-retry = "0.7.0" +thiserror = "1.0" murmur2 = "0.1" csv = "1.3" diff --git a/src-tauri/src/instance/commands.rs b/src-tauri/src/instance/commands.rs index 8c4491ba8..0bea43821 100644 --- a/src-tauri/src/instance/commands.rs +++ b/src-tauri/src/instance/commands.rs @@ -48,7 +48,7 @@ use crate::{ models::{GameClientResourceInfo, ModLoaderResourceInfo}, }, storage::{load_json_async, save_json_async, Storage}, - tasks::{commands::schedule_progressive_task_group, download::DownloadParam, PTaskParam}, + tasks::{commands::schedule_progressive_task_group, download::DownloadParam, RuntimeTaskParam}, utils::{fs::create_url_shortcut, image::ImageWrapper}, }; use lazy_static::lazy_static; @@ -883,7 +883,7 @@ pub async fn create_instance( minimum_launcher_version: version_info.minimum_launcher_version, }); - let mut task_params = Vec::::new(); + let mut task_params = Vec::::new(); // Download client (use task) let client_download_info = version_info @@ -891,7 +891,7 @@ pub async fn create_instance( .get("client") .ok_or(InstanceError::ClientJsonParseError)?; - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: Url::parse(&client_download_info.url.clone()) .map_err(|_| InstanceError::ClientJsonParseError)?, dest: instance.version_path.join(format!("{}.jar", name)), diff --git a/src-tauri/src/instance/helpers/loader/common.rs b/src-tauri/src/instance/helpers/loader/common.rs index 7de6baf9d..005aeb754 100644 --- a/src-tauri/src/instance/helpers/loader/common.rs +++ b/src-tauri/src/instance/helpers/loader/common.rs @@ -21,7 +21,7 @@ use crate::{ error::SJMCLResult, instance::models::misc::{ModLoader, ModLoaderType}, resource::models::SourceType, - tasks::PTaskParam, + tasks::RuntimeTaskParam, }; pub fn add_library_entry( @@ -69,7 +69,7 @@ pub async fn install_mod_loader( lib_dir: PathBuf, mods_dir: PathBuf, client_info: &mut McClientInfo, - task_params: &mut Vec, + task_params: &mut Vec, ) -> SJMCLResult<()> { match loader.loader_type { ModLoaderType::Fabric => { diff --git a/src-tauri/src/instance/helpers/loader/fabric.rs b/src-tauri/src/instance/helpers/loader/fabric.rs index 609ea4da2..20a0d0dd7 100644 --- a/src-tauri/src/instance/helpers/loader/fabric.rs +++ b/src-tauri/src/instance/helpers/loader/fabric.rs @@ -15,7 +15,7 @@ use crate::{ helpers::misc::get_download_api, models::{ResourceType, SourceType}, }, - tasks::{download::DownloadParam, PTaskParam}, + tasks::{download::DownloadParam, RuntimeTaskParam}, }; pub async fn install_fabric_loader( @@ -26,7 +26,7 @@ pub async fn install_fabric_loader( lib_dir: PathBuf, mods_dir: PathBuf, client_info: &mut McClientInfo, - task_params: &mut Vec, + task_params: &mut Vec, ) -> SJMCLResult<()> { let client = app.state::(); let loader_ver = &loader.version; @@ -84,7 +84,7 @@ pub async fn install_fabric_loader( &[ResourceType::FabricMaven, ResourceType::Libraries], &priority[0], )?; - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src, dest: lib_dir.join(&rel), filename: None, @@ -113,7 +113,7 @@ pub async fn install_fabric_loader( if let Ok(Some(fabric_api_download)) = get_latest_fabric_api_mod_download(&app, game_version, mods_dir).await { - task_params.push(PTaskParam::Download(fabric_api_download)); + task_params.push(RuntimeTaskParam::Download(fabric_api_download)); } Ok(()) diff --git a/src-tauri/src/instance/helpers/loader/forge.rs b/src-tauri/src/instance/helpers/loader/forge.rs index 2ebdc5e38..6b199af62 100644 --- a/src-tauri/src/instance/helpers/loader/forge.rs +++ b/src-tauri/src/instance/helpers/loader/forge.rs @@ -26,7 +26,7 @@ use crate::{ helpers::misc::get_download_api, models::{ResourceType, SourceType}, }, - tasks::{commands::schedule_progressive_task_group, download::DownloadParam, PTaskParam}, + tasks::{commands::schedule_progressive_task_group, download::DownloadParam, RuntimeTaskParam}, }; async fn fetch_bmcl_forge_installer_url( @@ -58,7 +58,7 @@ pub async fn install_forge_loader( game_version: &str, loader: &ModLoader, lib_dir: PathBuf, - task_params: &mut Vec, + task_params: &mut Vec, ) -> SJMCLResult<()> { let loader_ver = &loader.version; @@ -88,7 +88,7 @@ pub async fn install_forge_loader( let installer_rel = convert_library_name_to_path(&installer_coord, None)?; let installer_path = lib_dir.join(&installer_rel); - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: installer_url, dest: installer_path.clone(), filename: None, @@ -228,7 +228,7 @@ pub async fn download_forge_libraries( if processor.args.contains(&"DOWNLOAD_MOJMAPS".to_string()) { if let Some(mojmaps) = args_map.get("{MOJMAPS}") { if let Some(client_mappings) = client_info.downloads.get("client_mappings") { - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: client_mappings.url.parse()?, dest: lib_dir.join(mojmaps), filename: None, @@ -301,7 +301,7 @@ pub async fn download_forge_libraries( continue; } - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: convert_url_to_target_source( &Url::parse(url)?, &[ @@ -357,7 +357,7 @@ pub async fn download_forge_libraries( } let rel = convert_library_name_to_path(&name.to_string(), None)?; - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: convert_url_to_target_source( &Url::parse(url)?, &[ @@ -437,7 +437,7 @@ pub async fn download_forge_libraries( ], &priority[0], )?; - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src, dest: lib_dir.join(&rel), filename: None, @@ -449,7 +449,7 @@ pub async fn download_forge_libraries( let mut seen = std::collections::HashSet::new(); task_params.retain(|param| match param { - PTaskParam::Download(dp) => seen.insert(dp.dest.clone()), + RuntimeTaskParam::Download(dp) => seen.insert(dp.dest.clone()), }); schedule_progressive_task_group( diff --git a/src-tauri/src/instance/helpers/loader/neoforge.rs b/src-tauri/src/instance/helpers/loader/neoforge.rs index b5acc038d..d2edadc77 100644 --- a/src-tauri/src/instance/helpers/loader/neoforge.rs +++ b/src-tauri/src/instance/helpers/loader/neoforge.rs @@ -20,14 +20,14 @@ use crate::{ helpers::misc::get_download_api, models::{ResourceType, SourceType}, }, - tasks::{commands::schedule_progressive_task_group, download::DownloadParam, PTaskParam}, + tasks::{commands::schedule_progressive_task_group, download::DownloadParam, RuntimeTaskParam}, }; pub async fn install_neoforge_loader( priority: &[SourceType], loader: &ModLoader, lib_dir: PathBuf, - task_params: &mut Vec, + task_params: &mut Vec, ) -> SJMCLResult<()> { let loader_ver = &loader.version; @@ -62,7 +62,7 @@ pub async fn install_neoforge_loader( let installer_rel = convert_library_name_to_path(&installer_coord, None)?; let installer_path = lib_dir.join(&installer_rel); - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: installer_url, dest: installer_path.clone(), filename: None, @@ -205,7 +205,7 @@ pub async fn download_neoforge_libraries( if processor.args.contains(&"DOWNLOAD_MOJMAPS".to_string()) { if let Some(mojmaps) = args_map.get("{MOJMAPS}") { if let Some(client_mappings) = client_info.downloads.get("client_mappings") { - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: client_mappings.url.parse()?, dest: lib_dir.join(mojmaps), filename: None, @@ -278,7 +278,7 @@ pub async fn download_neoforge_libraries( continue; } - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: convert_url_to_target_source( &Url::parse(url)?, &[ResourceType::NeoforgeMaven, ResourceType::Libraries], @@ -326,7 +326,7 @@ pub async fn download_neoforge_libraries( } let rel = convert_library_name_to_path(&name.to_string(), None)?; - task_params.push(PTaskParam::Download(DownloadParam { + task_params.push(RuntimeTaskParam::Download(DownloadParam { src: convert_url_to_target_source( &Url::parse(url)?, &[ResourceType::NeoforgeMaven, ResourceType::Libraries], @@ -340,7 +340,7 @@ pub async fn download_neoforge_libraries( let mut seen = std::collections::HashSet::new(); task_params.retain(|param| match param { - PTaskParam::Download(dp) => seen.insert(dp.dest.clone()), + RuntimeTaskParam::Download(dp) => seen.insert(dp.dest.clone()), }); schedule_progressive_task_group( diff --git a/src-tauri/src/instance/helpers/modpack/curseforge.rs b/src-tauri/src/instance/helpers/modpack/curseforge.rs index b307533e2..59e8ab905 100644 --- a/src-tauri/src/instance/helpers/modpack/curseforge.rs +++ b/src-tauri/src/instance/helpers/modpack/curseforge.rs @@ -14,7 +14,7 @@ use crate::{ error::{SJMCLError, SJMCLResult}, instance::models::misc::{InstanceError, ModLoaderType}, resource::helpers::curseforge::misc::CurseForgeProject, - tasks::{download::DownloadParam, PTaskParam}, + tasks::{download::DownloadParam, RuntimeTaskParam}, }; #[derive(Deserialize, Serialize, Debug, Clone)] @@ -141,7 +141,7 @@ impl CurseForgeManifest { &self, app: &AppHandle, instance_path: &Path, - ) -> SJMCLResult> { + ) -> SJMCLResult> { let client = app.state::(); let instance_path = instance_path.to_path_buf(); @@ -198,7 +198,7 @@ impl CurseForgeManifest { .and_then(|hs| hs.iter().find(|h| h.algo == 1)) .map(|h| h.value.clone()); - let task_param = PTaskParam::Download(DownloadParam { + let task_param = RuntimeTaskParam::Download(DownloadParam { src: url::Url::parse(&download_url).map_err(|_| InstanceError::InvalidSourcePath)?, sha1, dest: instance_path @@ -211,7 +211,7 @@ impl CurseForgeManifest { filename: Some(file_manifest.data.file_name.clone()), }); - Ok::(task_param) + Ok::(task_param) } }); diff --git a/src-tauri/src/instance/helpers/modpack/modrinth.rs b/src-tauri/src/instance/helpers/modpack/modrinth.rs index cbf9fd86d..487ee8f93 100644 --- a/src-tauri/src/instance/helpers/modpack/modrinth.rs +++ b/src-tauri/src/instance/helpers/modpack/modrinth.rs @@ -11,7 +11,7 @@ use zip::ZipArchive; use crate::{ error::SJMCLResult, instance::models::misc::{InstanceError, ModLoaderType}, - tasks::{download::DownloadParam, PTaskParam}, + tasks::{download::DownloadParam, RuntimeTaskParam}, }; structstruck::strike! { @@ -110,7 +110,7 @@ impl ModrinthManifest { Ok(()) } - pub fn get_download_params(&self, instance_path: &Path) -> SJMCLResult> { + pub fn get_download_params(&self, instance_path: &Path) -> SJMCLResult> { self .files .iter() @@ -119,7 +119,7 @@ impl ModrinthManifest { .downloads .first() .ok_or(InstanceError::InvalidSourcePath)?; - Ok(PTaskParam::Download(DownloadParam { + Ok(RuntimeTaskParam::Download(DownloadParam { src: url::Url::parse(download_url).map_err(|_| InstanceError::InvalidSourcePath)?, sha1: Some(file.hashes.sha1.clone()), dest: instance_path.join(&file.path), diff --git a/src-tauri/src/launch/helpers/file_validator.rs b/src-tauri/src/launch/helpers/file_validator.rs index b585a009c..e8a7bcfaf 100644 --- a/src-tauri/src/launch/helpers/file_validator.rs +++ b/src-tauri/src/launch/helpers/file_validator.rs @@ -12,7 +12,7 @@ use crate::{ helpers::misc::{convert_url_to_target_source, get_download_api}, models::{ResourceType, SourceType}, }, - tasks::{download::DownloadParam, PTaskParam}, + tasks::{download::DownloadParam, RuntimeTaskParam}, utils::fs::validate_sha1, }; use futures::future::join_all; @@ -74,7 +74,7 @@ pub async fn get_invalid_library_files( library_path: &Path, client_info: &McClientInfo, check_hash: bool, -) -> SJMCLResult> { +) -> SJMCLResult> { let mut artifacts = Vec::new(); artifacts.extend(get_native_library_artifacts(client_info)); artifacts.extend(get_nonnative_library_artifacts(client_info)); @@ -101,7 +101,7 @@ pub async fn get_invalid_library_files( ], &source, )?; - Ok(Some(PTaskParam::Download(DownloadParam { + Ok(Some(RuntimeTaskParam::Download(DownloadParam { src, dest: file_path, filename: None, @@ -111,7 +111,7 @@ pub async fn get_invalid_library_files( } }); - let results: Vec>> = join_all(futs).await; + let results: Vec>> = join_all(futs).await; let mut params = Vec::new(); for r in results { @@ -268,7 +268,7 @@ pub async fn get_invalid_assets( source: SourceType, asset_path: &Path, check_hash: bool, -) -> SJMCLResult> { +) -> SJMCLResult> { let assets_download_api = get_download_api(source, ResourceType::Assets)?; let asset_index_path = asset_path.join(format!("indexes/{}.json", client_info.asset_index.id)); @@ -284,12 +284,12 @@ pub async fn get_invalid_assets( let exists = fs::try_exists(&dest).await?; if exists && (!check_hash || validate_sha1(dest.clone(), item.hash.clone()).is_ok()) { - Ok::, crate::error::SJMCLError>(None) + Ok::, crate::error::SJMCLError>(None) } else { let src = assets_download_api .join(&path_in_repo) .map_err(crate::error::SJMCLError::from)?; - Ok(Some(PTaskParam::Download(DownloadParam { + Ok(Some(RuntimeTaskParam::Download(DownloadParam { src, dest, filename: None, @@ -299,7 +299,7 @@ pub async fn get_invalid_assets( } }); - let results: Vec>> = join_all(futs).await; + let results: Vec>> = join_all(futs).await; let mut params = Vec::new(); for r in results { diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 72f57dd1a..51ef99ddb 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -13,6 +13,8 @@ mod utils; use account::{ helpers::authlib_injector::info::refresh_and_update_auth_servers, models::AccountInfo, }; + +use async_speed_limit::Limiter; use instance::helpers::misc::refresh_and_update_instances; use instance::models::misc::Instance; use launch::models::LaunchingState; @@ -27,7 +29,7 @@ use std::{collections::HashMap, sync::OnceLock}; use storage::Storage; use tasks::monitor::TaskMonitor; use tauri_plugin_log::{Target, TargetKind}; -use utils::{portable::is_portable, web::build_sjmcl_client}; +use utils::{portable::is_portable, web::WebConfig}; #[cfg_attr(mobile, tauri::mobile_entry_point)] use tauri::menu::MenuBuilder; @@ -136,17 +138,11 @@ pub async fn run() { discover::commands::fetch_news_sources_info, discover::commands::fetch_news_post_summaries, tasks::commands::schedule_progressive_task_group, - tasks::commands::cancel_progressive_task, - tasks::commands::resume_progressive_task, - tasks::commands::stop_progressive_task, tasks::commands::retrieve_progressive_task_list, - tasks::commands::create_transient_task, - tasks::commands::get_transient_task, - tasks::commands::set_transient_task_state, - tasks::commands::cancel_transient_task, tasks::commands::cancel_progressive_task_group, tasks::commands::resume_progressive_task_group, tasks::commands::stop_progressive_task_group, + tasks::commands::retry_progressive_task_group, utils::commands::retrieve_memory_info, utils::commands::extract_filename, utils::commands::retrieve_truetype_font_list, @@ -173,6 +169,15 @@ pub async fn run() { let mut launcher_config: LauncherConfig = LauncherConfig::load().unwrap_or_default(); launcher_config.setup_with_app(app.handle()).unwrap(); launcher_config.save().unwrap(); + + let limiter = if launcher_config.download.transmission.enable_speed_limit { + Some(::new( + (launcher_config.download.transmission.speed_limit_value * 1024) as f64, + )) + } else { + None + }; + app.manage(limiter); app.manage(Mutex::new(launcher_config)); let account_info = AccountInfo::load().unwrap_or_default(); @@ -189,7 +194,7 @@ pub async fn run() { app.manage(Box::pin(TaskMonitor::new(app.handle().clone()))); - let client = build_sjmcl_client(app.handle(), true, false); + let client = WebConfig::default().build(app.handle()); app.manage(client); let launching_queue = Vec::::new(); @@ -203,8 +208,8 @@ pub async fn run() { .unwrap_or_default(); }); - // Refresh all auth servers let app_handle = app.handle().clone(); + // Refresh all auth servers tauri::async_runtime::spawn(async move { refresh_and_update_auth_servers(&app_handle) .await diff --git a/src-tauri/src/resource/commands.rs b/src-tauri/src/resource/commands.rs index 6a684dc85..bea288f9a 100644 --- a/src-tauri/src/resource/commands.rs +++ b/src-tauri/src/resource/commands.rs @@ -30,7 +30,7 @@ use crate::{ }, models::{ModUpdateQuery, OtherResourceFileInfo, OtherResourceInfo, OtherResourceSource}, }, - tasks::{commands::schedule_progressive_task_group, download::DownloadParam, PTaskParam}, + tasks::{commands::schedule_progressive_task_group, download::DownloadParam, RuntimeTaskParam}, }; use std::sync::Mutex; use tauri::{AppHandle, State}; @@ -142,7 +142,7 @@ pub async fn download_game_server( schedule_progressive_task_group( app, format!("game-server?{}", resource_info.id), - vec![PTaskParam::Download(DownloadParam { + vec![RuntimeTaskParam::Download(DownloadParam { src: url::Url::parse(&download_info.url.clone()).map_err(|_| ResourceError::ParseError)?, dest: dest.clone().into(), filename: None, @@ -197,9 +197,8 @@ pub async fn update_mods( filename: None, sha1: Some(query.sha1.clone()), }; - download_tasks.push(PTaskParam::Download(download_param)); + download_tasks.push(RuntimeTaskParam::Download(download_param)); } - schedule_progressive_task_group(app, "mod-update".to_string(), download_tasks, true).await?; for query in &queries { diff --git a/src-tauri/src/tasks/background.rs b/src-tauri/src/tasks/background.rs index 01deff0c9..e48e3b812 100644 --- a/src-tauri/src/tasks/background.rs +++ b/src-tauri/src/tasks/background.rs @@ -4,6 +4,5 @@ use tauri::{AppHandle, Manager}; pub async fn monitor_background_process(app: AppHandle) { let monitor = app.state::>>(); - monitor.load_saved_tasks().await; monitor.background_process().await; } diff --git a/src-tauri/src/tasks/commands.rs b/src-tauri/src/tasks/commands.rs index cf2e9bac6..94c91cdeb 100644 --- a/src-tauri/src/tasks/commands.rs +++ b/src-tauri/src/tasks/commands.rs @@ -1,152 +1,57 @@ -use std::{pin::Pin, time::Duration}; +use super::monitor::{RuntimeGroupDescSnapshot, TaskCommand, TaskMonitor}; +use super::RuntimeTaskParam; +use crate::error::SJMCLResult; +use std::pin::Pin; use tauri::{AppHandle, Manager}; -use crate::{ - error::SJMCLResult, - tasks::{download::DownloadTask, events::GEventStatus, monitor::TaskMonitor}, - utils::fs::extract_filename, -}; - -use super::{PTaskGroupDesc, PTaskParam, SJMCLFutureDesc, THandle}; - #[tauri::command] pub async fn schedule_progressive_task_group( app: AppHandle, task_group: String, - params: Vec, + params: Vec, with_timestamp: bool, -) -> SJMCLResult { +) -> SJMCLResult { let monitor = app.state::>>(); - let mut task_descs = Vec::new(); - let mut future_descs = Vec::new(); let task_group = if with_timestamp { - // If with_timestamp is true, append a timestamp to the task group name - // to ensure uniqueness and avoid conflicts. + // append a timestamp to the task group name to ensure uniqueness. let timestamp = chrono::Utc::now().timestamp_millis(); format!("{task_group}@{timestamp}") } else { task_group.clone() }; - - for param in params { - let task_id = monitor.get_new_id(); - match param { - PTaskParam::Download(mut param) => { - if param.filename.is_none() { - param.filename = Some(extract_filename( - param.dest.to_str().unwrap_or_default(), - true, - )); - } - let task = DownloadTask::new( - app.clone(), - task_id, - Some(task_group.clone()), - param, - Duration::from_secs(1), - ); - let (f, h) = task - .future(app.clone(), monitor.download_rate_limiter.clone()) - .await?; - let task_desc = h.read().unwrap().desc.clone(); - let future_desc = SJMCLFutureDesc { - task_id, - f: Box::pin(f), - h: h.clone(), - }; - task_descs.push(task_desc); - future_descs.push(future_desc); - } - } - } - monitor - .enqueue_task_group(task_group.clone(), future_descs) - .await; - Ok(PTaskGroupDesc { - task_group, - task_descs, - status: GEventStatus::Started, - }) -} - -#[tauri::command] -pub fn create_transient_task(app: AppHandle, desc: THandle) -> SJMCLResult<()> { - let monitor = app.state::>>(); - monitor.create_transient_task(app.clone(), desc); - Ok(()) -} - -#[tauri::command] -pub fn set_transient_task_state(app: AppHandle, task_id: u32, state: String) -> SJMCLResult<()> { - let monitor = app.state::>>(); - monitor.set_transient_task(app.clone(), task_id, state); - Ok(()) + Ok(monitor.schedule_task_group(task_group, params).await) } #[tauri::command] -pub fn cancel_transient_task(app: AppHandle, task_id: u32) -> SJMCLResult<()> { +pub async fn cancel_progressive_task_group(app: AppHandle, task_group: String) -> SJMCLResult<()> { let monitor = app.state::>>(); - monitor.cancel_transient_task(task_id); + monitor.apply_cmd(task_group, TaskCommand::Cancel).await; Ok(()) } #[tauri::command] -pub fn get_transient_task(app: AppHandle, task_id: u32) -> SJMCLResult> { - let monitor = app.state::>>(); - Ok(monitor.get_transient_task(task_id)) -} - -#[tauri::command] -pub fn cancel_progressive_task(app: AppHandle, task_id: u32) -> SJMCLResult<()> { - let monitor = app.state::>>(); - monitor.cancel_progress(task_id); - Ok(()) -} - -#[tauri::command] -pub fn resume_progressive_task(app: AppHandle, task_id: u32) -> SJMCLResult<()> { - let monitor = app.state::>>(); - monitor.resume_progress(task_id); - Ok(()) -} - -#[tauri::command] -pub async fn restart_progressive_task(app: AppHandle, task_id: u32) -> SJMCLResult<()> { - let monitor = app.state::>>(); - monitor.restart_progress(task_id).await; - Ok(()) -} - -#[tauri::command] -pub fn stop_progressive_task(app: AppHandle, task_id: u32) -> SJMCLResult<()> { - let monitor = app.state::>>(); - monitor.stop_progress(task_id); - Ok(()) -} - -#[tauri::command] -pub fn cancel_progressive_task_group(app: AppHandle, task_group: String) -> SJMCLResult<()> { +pub async fn resume_progressive_task_group(app: AppHandle, task_group: String) -> SJMCLResult<()> { let monitor = app.state::>>(); - monitor.cancel_progressive_task_group(task_group); + monitor.apply_cmd(task_group, TaskCommand::Resume).await; Ok(()) } #[tauri::command] -pub async fn resume_progressive_task_group(app: AppHandle, task_group: String) -> SJMCLResult<()> { +pub async fn stop_progressive_task_group(app: AppHandle, task_group: String) -> SJMCLResult<()> { let monitor = app.state::>>(); - monitor.resume_progressive_task_group(task_group).await; + monitor.apply_cmd(task_group, TaskCommand::Stop).await; Ok(()) } #[tauri::command] -pub fn stop_progressive_task_group(app: AppHandle, task_group: String) -> SJMCLResult<()> { +pub async fn retry_progressive_task_group(app: AppHandle, task_group: String) -> SJMCLResult<()> { let monitor = app.state::>>(); - monitor.stop_progressive_task_group(task_group); + monitor.apply_cmd(task_group, TaskCommand::Retry).await; Ok(()) } #[tauri::command] -pub fn retrieve_progressive_task_list(app: AppHandle) -> Vec { +pub fn retrieve_progressive_task_list(app: AppHandle) -> Vec { let monitor = app.state::>>(); monitor.state_list() } diff --git a/src-tauri/src/tasks/download.rs b/src-tauri/src/tasks/download.rs index de1dd1196..3787c0024 100644 --- a/src-tauri/src/tasks/download.rs +++ b/src-tauri/src/tasks/download.rs @@ -1,232 +1,145 @@ -use crate::error::{SJMCLError, SJMCLResult}; use crate::launcher_config::commands::retrieve_launcher_config; use crate::utils::fs::validate_sha1; +use crate::utils::web::with_retry; -use async_speed_limit::Limiter; -use futures::stream::TryStreamExt; -use futures::StreamExt; +use super::monitor::{RuntimeGroupStateRwLock, RuntimeTaskDescRwLock, RuntimeTaskProgressReader}; +use super::reporter::TaskReporter; + +use async_speed_limit::{clock::StandardClock, Limiter}; +use futures::stream::{Stream, TryStreamExt}; use serde::{Deserialize, Serialize}; -use std::error::Error; use std::future::Future; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; -use std::time::Duration; use tauri::{AppHandle, Manager, Url}; use tauri_plugin_http::reqwest; use tauri_plugin_http::reqwest::header::RANGE; use tokio::io::AsyncSeekExt; use tokio_util::{bytes, compat::FuturesAsyncReadCompatExt}; -use super::super::utils::web::with_retry; -use super::streams::desc::{PDesc, PStatus}; -use super::streams::reporter::Reporter; -use super::streams::ProgressStream; -use super::*; - #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct DownloadParam { pub src: Url, pub dest: PathBuf, - pub filename: Option, pub sha1: Option, + pub filename: Option, } -pub struct DownloadTask { - p_handle: PTaskHandle, - param: DownloadParam, - dest_path: PathBuf, - report_interval: Duration, +async fn send_request( + app_handle: &AppHandle, + current: i64, + param: &DownloadParam, +) -> std::io::Result { + let state = app_handle.state::(); + let client = with_retry(state.inner().clone()); + let request = if current == 0 { + client.get(param.src.clone()) + } else { + client + .get(param.src.clone()) + .header(RANGE, format!("bytes={current}-")) + }; + let response = request.send().await.map_err(std::io::Error::other)?; + let response = response.error_for_status().map_err(std::io::Error::other)?; + Ok(response) } -impl DownloadTask { - pub fn new( - app_handle: AppHandle, - task_id: u32, - task_group: Option, - param: DownloadParam, - report_interval: Duration, - ) -> Self { - let cache_dir = retrieve_launcher_config(app_handle.clone()) - .unwrap() - .download - .cache - .directory; - DownloadTask { - p_handle: PTaskHandle::new( - PDesc::::new( - task_id, - task_group.clone(), - 0, - PTaskParam::Download(param.clone()), - PStatus::InProgress, - ), - Duration::from_secs(1), - cache_dir.clone().join(format!("task-{task_id}.json")), - Reporter::new( - 0, - Duration::from_secs(1), - TauriEventSink::new(app_handle.clone()), - ), - ), - param: param.clone(), - dest_path: cache_dir.clone().join(param.dest.clone()), - report_interval, - } - } +async fn create_resp_stream( + app_handle: &AppHandle, + current: i64, + param: &DownloadParam, +) -> std::io::Result<( + impl Stream> + Send, + i64, +)> { + let resp = send_request(app_handle, current, param).await?; + let total_progress = if current == 0 { + resp.content_length().unwrap() as i64 + } else { + -1 + }; + Ok(( + resp.bytes_stream().map_err(std::io::Error::other), + total_progress, + )) +} - pub fn from_descriptor( - app_handle: AppHandle, - desc: PTaskDesc, - report_interval: Duration, - reset: bool, - ) -> Self { - let param = match &desc.payload { - PTaskParam::Download(param) => param.clone(), - }; +pub fn into_runtime_task<'a>( + app_handle: AppHandle, + group_state: RuntimeGroupStateRwLock, + task_desc: RuntimeTaskDescRwLock, + param: DownloadParam, + reporter: &'a mut TaskReporter, +) -> impl Future> + Send + 'a { + let download_path = retrieve_launcher_config(app_handle.clone()) + .unwrap() + .download + .cache + .directory + .clone(); - let cache_dir = retrieve_launcher_config(app_handle.clone()) - .unwrap() - .download - .cache - .directory; - let task_id = desc.task_id; - let path = cache_dir.join(format!("task-{task_id}.json")); - DownloadTask { - p_handle: PTaskHandle::new( - if reset { - PTaskDesc { - status: PStatus::Waiting, - current: 0, - ..desc - } - } else { - PTaskDesc { - status: PStatus::Waiting, - ..desc - } - }, - Duration::from_secs(1), - path, - Reporter::new( - desc.total, - Duration::from_secs(1), - TauriEventSink::new(app_handle.clone()), - ), - ), - param: param.clone(), - dest_path: cache_dir.clone().join(param.dest.clone()), - report_interval, - } - } + let lim = app_handle + .state::>>() + .as_ref() + .cloned(); + let dest_path = download_path.join(param.dest.clone()); - async fn send_request( - app_handle: &AppHandle, - current: i64, - param: &DownloadParam, - ) -> SJMCLResult { - let state = app_handle.state::(); - let client = with_retry(state.inner().clone()); - let request = if current == 0 { - client.get(param.src.clone()) + async move { + let current = task_desc.read().unwrap().current; + let (resp, total_progress) = create_resp_stream(&app_handle, current as i64, ¶m).await?; + let total_progress = if total_progress != -1 { + let mut task_desc = task_desc.write().unwrap(); + task_desc.total = total_progress as u64; + task_desc.state.set_in_progress(); + task_desc.save().unwrap(); + total_progress } else { - client - .get(param.src.clone()) - .header(RANGE, format!("bytes={current}-")) + let task_desc = task_desc.write().unwrap(); + task_desc.total as i64 }; - let response = request - .send() - .await - .map_err(|e| SJMCLError(format!("{:?}", e.source())))?; + reporter.report_started(total_progress); + tokio::fs::create_dir_all(&dest_path.parent().unwrap()).await?; - let response = response - .error_for_status() - .map_err(|e| SJMCLError(format!("{:?}", e.source())))?; - - Ok(response) - } - - async fn create_resp_stream( - app_handle: &AppHandle, - current: i64, - param: &DownloadParam, - ) -> SJMCLResult<( - impl Stream> + Send, - i64, - )> { - let resp = Self::send_request(app_handle, current, param).await?; - let total_progress = if current == 0 { - resp.content_length().unwrap() as i64 + let mut file = if current == 0 { + tokio::fs::File::create(&dest_path).await? } else { - -1 + let mut f = tokio::fs::OpenOptions::new().open(&dest_path).await?; + f.seek(std::io::SeekFrom::Start(current)).await?; + f }; - Ok(( - resp.bytes_stream().map(|res| match res { - Ok(bytes) => Ok(bytes), - Err(_) => Ok(bytes::Bytes::new()), - }), - total_progress, - )) - } - async fn future_impl( - self, - app_handle: AppHandle, - limiter: Option, - ) -> SJMCLResult<( - impl Future> + Send, - Arc>, - )> { - let current = self.p_handle.desc.current; - let handle = Arc::new(RwLock::new(self.p_handle)); - let task_handle = handle.clone(); - let param = self.param.clone(); - Ok(( - async move { - let (resp, total_progress) = Self::create_resp_stream(&app_handle, current, ¶m).await?; - let stream = ProgressStream::new(resp, task_handle.clone()); - tokio::fs::create_dir_all(&self.dest_path.parent().unwrap()).await?; - let mut file = if current == 0 { - tokio::fs::File::create(&self.dest_path).await? - } else { - let mut f = tokio::fs::OpenOptions::new().open(&self.dest_path).await?; - f.seek(std::io::SeekFrom::Start(current as u64)).await?; - f - }; - { - let mut task_handle = task_handle.write().unwrap(); - task_handle.set_total(total_progress); - task_handle.mark_started(); - } - if let Some(lim) = limiter { - tokio::io::copy(&mut lim.limit(stream.into_async_read()).compat(), &mut file).await?; - } else { - tokio::io::copy(&mut stream.into_async_read().compat(), &mut file).await?; - } - drop(file); - if task_handle.read().unwrap().status().is_cancelled() { - tokio::fs::remove_file(&self.dest_path).await?; - Ok(()) - } else { - match param.sha1 { - Some(truth) => validate_sha1(param.dest, truth), - None => Ok(()), - } - } - }, - handle, - )) - } + let result = if let Some(lim) = lim { + let mut reader = RuntimeTaskProgressReader::new( + lim.limit(resp.into_async_read()).compat(), + group_state, + task_desc.clone(), + reporter, + ); + tokio::io::copy(&mut reader, &mut file).await + } else { + let mut reader = RuntimeTaskProgressReader::new( + resp.into_async_read().compat(), + group_state, + task_desc.clone(), + reporter, + ); + tokio::io::copy(&mut reader, &mut file).await + } + .map(|_| ()); + + drop(file); + if task_desc.read().unwrap().state.is_cancelled() { + tokio::fs::remove_file(&dest_path).await?; + } - pub async fn future( - self, - app_handle: AppHandle, - limiter: Option, - ) -> SJMCLResult<( - impl Future> + Send, - Arc>, - )> { - Self::future_impl(self, app_handle, limiter).await + if result.is_ok() { + match param.sha1 { + Some(truth) => validate_sha1(dest_path, truth).map_err(|e| std::io::Error::other(e.0)), + None => Ok(()), + } + } else { + result + } } } diff --git a/src-tauri/src/tasks/events.rs b/src-tauri/src/tasks/events.rs index e65aea367..0148817f2 100644 --- a/src-tauri/src/tasks/events.rs +++ b/src-tauri/src/tasks/events.rs @@ -1,6 +1,4 @@ -use super::streams::reporter::Sink; -use super::PTaskDesc; -use super::THandle; +use super::reporter::Sink; use serde::{Deserialize, Serialize}; use tauri::{AppHandle, Emitter}; use tokio::time::Duration; @@ -10,11 +8,7 @@ const TASK_GROUP_UPDATE_EVENT: &str = "task:group-update"; #[derive(Serialize, Deserialize, Clone)] #[serde(tag = "status")] -pub enum PEventStatus { - #[serde(rename_all = "camelCase")] - Created { - desc: PTaskDesc, - }, +pub enum TaskEventPayload { #[serde(rename_all = "camelCase")] Started { total: i64, @@ -37,13 +31,13 @@ pub enum PEventStatus { #[derive(Serialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct PEvent<'a> { +pub struct TaskEvent<'a> { pub id: u32, pub task_group: Option<&'a str>, - pub event: PEventStatus, + pub event: TaskEventPayload, } -impl<'a> PEvent<'a> { +impl<'a> TaskEvent<'a> { pub fn emit(self, app: &AppHandle) { app .emit_to("main", TASK_PROGRESS_UPDATE_EVENT, self) @@ -54,7 +48,7 @@ impl<'a> PEvent<'a> { Self { id, task_group, - event: PEventStatus::Started { total }, + event: TaskEventPayload::Started { total }, } .emit(app); } @@ -63,7 +57,7 @@ impl<'a> PEvent<'a> { Self { id, task_group, - event: PEventStatus::Failed { reason }, + event: TaskEventPayload::Failed { reason }, } .emit(app); } @@ -72,7 +66,7 @@ impl<'a> PEvent<'a> { Self { id, task_group, - event: PEventStatus::Stopped, + event: TaskEventPayload::Stopped, } .emit(app); } @@ -81,7 +75,7 @@ impl<'a> PEvent<'a> { Self { id, task_group, - event: PEventStatus::Cancelled, + event: TaskEventPayload::Cancelled, } .emit(app); } @@ -90,16 +84,7 @@ impl<'a> PEvent<'a> { Self { id, task_group, - event: PEventStatus::Completed, - } - .emit(app); - } - - pub fn emit_created(app: &AppHandle, id: u32, task_group: Option<&'a str>, desc: PTaskDesc) { - Self { - id, - task_group, - event: PEventStatus::Created { desc }, + event: TaskEventPayload::Completed, } .emit(app); } @@ -116,7 +101,7 @@ impl<'a> PEvent<'a> { Self { id, task_group, - event: PEventStatus::InProgress { + event: TaskEventPayload::InProgress { percent, current, estimated_time, @@ -128,7 +113,8 @@ impl<'a> PEvent<'a> { } #[derive(Serialize, Deserialize, Clone, PartialEq, Eq)] -pub enum GEventStatus { +pub enum GroupEventPayload { + Created, Started, Stopped, Failed, @@ -138,74 +124,81 @@ pub enum GEventStatus { #[derive(Serialize, Clone)] #[serde(rename_all = "camelCase")] -pub struct GEvent<'a> { +pub struct GroupEvent<'a> { pub task_group: &'a str, - pub event: GEventStatus, + pub event: GroupEventPayload, } -impl<'a> GEvent<'a> { +impl<'a> GroupEvent<'a> { fn emit(self, app: &AppHandle) { app.emit_to("main", TASK_GROUP_UPDATE_EVENT, self).unwrap(); } + pub fn emit_group_created(app: &AppHandle, task_group: &'a str) { + Self { + task_group, + event: GroupEventPayload::Created, + } + .emit(app); + } pub fn emit_group_started(app: &AppHandle, task_group: &'a str) { Self { task_group, - event: GEventStatus::Started, + event: GroupEventPayload::Started, } .emit(app); } pub fn emit_group_failed(app: &AppHandle, task_group: &'a str) { Self { task_group, - event: GEventStatus::Failed, + event: GroupEventPayload::Failed, } .emit(app); } pub fn emit_group_completed(app: &AppHandle, task_group: &'a str) { Self { task_group, - event: GEventStatus::Completed, + event: GroupEventPayload::Completed, } .emit(app); } pub fn emit_group_stopped(app: &AppHandle, task_group: &'a str) { Self { task_group, - event: GEventStatus::Stopped, + event: GroupEventPayload::Stopped, } .emit(app); } pub fn emit_group_cancelled(app: &AppHandle, task_group: &'a str) { Self { task_group, - event: GEventStatus::Cancelled, + event: GroupEventPayload::Cancelled, } .emit(app); } } -pub struct TauriEventSink { +pub struct TaskEventSink { app: AppHandle, } -impl TauriEventSink { +impl TaskEventSink { pub fn new(app: AppHandle) -> Self { Self { app } } } -impl Sink for TauriEventSink { +impl Sink for TaskEventSink { fn report_started(&self, task_id: u32, task_group: Option<&str>, total: i64) { - PEvent::emit_started(&self.app, task_id, task_group, total); + TaskEvent::emit_started(&self.app, task_id, task_group, total); } fn report_stopped(&self, task_id: u32, task_group: Option<&str>) { - PEvent::emit_stopped(&self.app, task_id, task_group); + TaskEvent::emit_stopped(&self.app, task_id, task_group); } fn report_cancelled(&self, task_id: u32, task_group: Option<&str>) { - PEvent::emit_cancelled(&self.app, task_id, task_group); + TaskEvent::emit_cancelled(&self.app, task_id, task_group); } fn report_completion(&self, task_id: u32, task_group: Option<&str>) { - PEvent::emit_completed(&self.app, task_id, task_group); + TaskEvent::emit_completed(&self.app, task_id, task_group); } fn report_progress( &self, @@ -217,7 +210,7 @@ impl Sink for TauriEventSink { estimated_time: Option, speed: f64, ) { - PEvent::emit_in_progress( + TaskEvent::emit_in_progress( &self.app, task_id, task_group, @@ -228,32 +221,6 @@ impl Sink for TauriEventSink { ); } fn report_failed(&self, task_id: u32, task_group: Option<&str>, reason: String) { - PEvent::emit_failed(&self.app, task_id, task_group, reason); - } -} - -#[derive(Serialize, Clone)] -pub struct TEvent<'a> { - pub id: u32, - pub task_group: Option<&'a str>, - pub state: &'a str, -} - -impl<'a> TEvent<'a> { - pub fn new(desc: &'a THandle) -> Self { - Self { - id: desc.task_id, - task_group: desc.task_group.as_deref(), - state: desc.state.as_str(), - } - } - pub fn emit(self, app: &AppHandle) { - if let Some(tg) = self.task_group { - app.emit_to("main", tg, self).unwrap(); - } else { - app - .emit_to("main", std::format!("task-{}", self.id).as_str(), self) - .unwrap(); - } + TaskEvent::emit_failed(&self.app, task_id, task_group, reason); } } diff --git a/src-tauri/src/tasks/mod.rs b/src-tauri/src/tasks/mod.rs index 8c00a6f52..cd1b3ab52 100644 --- a/src-tauri/src/tasks/mod.rs +++ b/src-tauri/src/tasks/mod.rs @@ -3,48 +3,12 @@ pub mod commands; pub mod download; pub mod events; pub mod monitor; -pub mod streams; +pub mod reporter; -use crate::error::SJMCLResult; -use download::DownloadParam; -use events::TauriEventSink; -use futures::stream::Stream; use serde::{Deserialize, Serialize}; -use std::future::Future; -use std::pin::Pin; -use std::sync::{Arc, RwLock}; -use streams::{GDesc, PDesc, PHandle}; -use tokio::time::Duration; -pub type SJMCLBoxedFuture = Pin> + Send>>; - -pub struct SJMCLFuture { - pub task_id: u32, - pub task_group: Option, - pub f: SJMCLBoxedFuture, -} - -pub struct SJMCLFutureDesc { - pub task_id: u32, - pub f: SJMCLBoxedFuture, - pub h: Arc>, -} - -type PTaskHandle = PHandle; -type PTaskDesc = PDesc; -type PTaskGroupDesc = GDesc; - -#[derive(Serialize, Deserialize, Clone)] -pub struct THandle { - #[serde(default)] - pub task_id: u32, - pub task_group: Option, - pub task_type: String, - pub state: String, -} - -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(tag = "taskType", rename_all = "camelCase")] -pub enum PTaskParam { - Download(DownloadParam), +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum RuntimeTaskParam { + Download(download::DownloadParam), } diff --git a/src-tauri/src/tasks/monitor.rs b/src-tauri/src/tasks/monitor.rs index c3b30397e..c0d8efeef 100644 --- a/src-tauri/src/tasks/monitor.rs +++ b/src-tauri/src/tasks/monitor.rs @@ -1,508 +1,801 @@ -use crate::error::SJMCLResult; use crate::launcher_config::commands::retrieve_launcher_config; -use crate::tasks::events::{GEventStatus, PEvent}; -use crate::tasks::streams::desc::PStatus; - -use async_speed_limit::Limiter; -use download::DownloadTask; -use flume::{Receiver as FlumeReceiver, Sender as FlumeSender}; -use glob::glob; -use log::info; -use std::collections::HashMap; +use crate::utils::fs::extract_filename; + +use super::download; +use super::reporter::{GroupReporter, TaskReporter}; + +use super::RuntimeTaskParam; +use flume::{Receiver, Sender}; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::future::Future; +use std::path::PathBuf; +use std::pin::Pin; use std::sync::atomic::AtomicU32; use std::sync::{Arc, Mutex, RwLock}; -use std::vec::Vec; -use tauri::async_runtime::JoinHandle; +use std::task::{Context, Poll}; +use std::time::{Duration, SystemTime}; use tauri::AppHandle; +use thiserror::Error; +use tokio::io::{AsyncRead, ReadBuf}; use tokio::sync::Semaphore; +use tokio::task::JoinHandle; +use tokio::time::Interval; + +pub type TaskId = u32; +pub type TaskResult = std::io::Result<()>; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum RuntimeState { + #[serde(rename_all = "camelCase")] + Stopped { + stopped_at: SystemTime, + }, + #[serde(rename_all = "camelCase")] + Failed { + reason: String, + }, + InProgress, + #[serde(rename_all = "camelCase")] + Completed { + completed_at: SystemTime, + }, + Cancelled, + Pending, +} -use super::events::{GEvent, TEvent}; -use super::SJMCLFuture; -use super::*; +impl RuntimeState { + pub fn is_stopped(&self) -> bool { + matches!(self, RuntimeState::Stopped { .. }) + } -pub struct GroupMonitor { - pub phs: HashMap>>, - pub status: GEventStatus, -} + pub fn is_failed(&self) -> bool { + matches!(self, RuntimeState::Failed { .. }) + } -pub struct TaskMonitor { - app_handle: AppHandle, - id_counter: AtomicU32, - phs: RwLock>>>, - ths: RwLock>, - tasks: Arc>>>, - concurrency: Arc, - tx: FlumeSender, - rx: FlumeReceiver, - group_map: Arc>>, - stopped_futures: Arc>>, - pub download_rate_limiter: Option, -} + pub fn is_completed(&self) -> bool { + matches!(self, RuntimeState::Completed { .. }) + } + pub fn is_in_progress(&self) -> bool { + matches!(self, RuntimeState::InProgress) + } + pub fn is_pending(&self) -> bool { + matches!(self, RuntimeState::Pending) + } -impl TaskMonitor { - pub fn new(app_handle: AppHandle) -> Self { - let config = retrieve_launcher_config(app_handle.clone()).unwrap(); - let (tx, rx) = flume::unbounded(); - TaskMonitor { - app_handle: app_handle.clone(), - id_counter: AtomicU32::new(0), - phs: RwLock::new(HashMap::new()), - ths: RwLock::new(HashMap::new()), - tasks: Arc::new(Mutex::new(HashMap::new())), - concurrency: Arc::new(Semaphore::new( - if config.download.transmission.auto_concurrent { - std::thread::available_parallelism().unwrap().into() - } else { - config.download.transmission.concurrent_count - }, - )), - tx, - rx, - group_map: Arc::new(RwLock::new(HashMap::new())), - stopped_futures: Arc::new(Mutex::new(Vec::new())), - download_rate_limiter: if config.download.transmission.enable_speed_limit { - Some(Limiter::new( - (config.download.transmission.speed_limit_value as i64 * 1024) as f64, - )) - } else { - None - }, - } + pub fn is_cancelled(&self) -> bool { + matches!(self, RuntimeState::Cancelled) } - #[allow(clippy::manual_flatten)] - pub async fn load_saved_tasks(&self) { - let cache_dir = retrieve_launcher_config(self.app_handle.clone()) + pub fn pollable(&self) -> bool { + matches!(self, RuntimeState::InProgress | RuntimeState::Pending) + } + + pub fn set_in_progress(&mut self) { + *self = RuntimeState::InProgress; + } + + pub fn set_pending(&mut self) { + *self = RuntimeState::Pending; + } + + pub fn set_stopped(&mut self) { + *self = RuntimeState::Stopped { + stopped_at: SystemTime::now(), + }; + } + pub fn set_failed(&mut self, reason: String) { + *self = RuntimeState::Failed { reason }; + } + + pub fn set_cancelled(&mut self) { + *self = RuntimeState::Cancelled; + } + + pub fn set_completed(&mut self) { + *self = RuntimeState::Completed { + completed_at: SystemTime::now(), + }; + } +} + +type RuntimeTaskState = RuntimeState; +pub type RuntimeGroupState = RuntimeState; +type RuntimeGroupDescRwLock = Arc>; +type SharedRuntimeTaskHandle = Arc; +type ConcurrentHashMap = Arc>>; +type ConcurrentHashSet = Arc>>; +pub type PinnedFuture<'a> = Pin + Send + 'a>>; +pub type RuntimeGroupStateRwLock = Arc>; +pub type RuntimeTaskDescRwLock = Arc>; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeTaskDesc { + pub id: u32, + pub group: Option, + started_at: SystemTime, + created_at: SystemTime, + #[serde(skip)] + pub path: PathBuf, + pub current: u64, + pub total: u64, + pub param: RuntimeTaskParam, + pub state: RuntimeTaskState, +} + +impl RuntimeTaskDesc { + pub fn new(app: AppHandle, id: u32, group: Option, param: RuntimeTaskParam) -> Self { + let cache_dir = retrieve_launcher_config(app.clone()) .unwrap() .download .cache .directory; - - for entry in glob(&format!( - "{}/descriptors/task_*.json", - cache_dir.to_str().unwrap() - )) - .unwrap() - { - if let Ok(task) = entry { - match PTaskDesc::load(&task.clone()) { - Ok(desc) => { - let task_id = desc.task_id; - let task_group = desc.task_group.clone(); - match desc.payload { - PTaskParam::Download(_) => { - let task = DownloadTask::from_descriptor( - self.app_handle.clone(), - desc, - Duration::from_secs(1), - false, - ); - let (f, p_handle) = task - .future(self.app_handle.clone(), self.download_rate_limiter.clone()) - .await - .unwrap(); - self.enqueue_task(task_id, task_group, f, p_handle).await; - } - } - } - Err(_) => { - info!("Failed to load task descriptor: {}", task.display()); - } - } - } + Self { + id, + group, + started_at: SystemTime::UNIX_EPOCH, + created_at: SystemTime::now(), + path: cache_dir.clone().join(format!("task-{id}.json")), + current: 0, + total: 0, + param, + state: RuntimeTaskState::Pending, } } - pub fn get_new_id(&self) -> u32 { - self - .id_counter - .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + fn reset(&mut self) { + self.started_at = SystemTime::now(); + self.current = 0; + self.total = 0; + self.state = RuntimeTaskState::Pending; } - pub async fn enqueue_task( - &self, - id: u32, - task_group: Option, - task: T, - p_handle: Arc>, - ) where - T: Future> + Send + 'static, - { - p_handle.write().unwrap().desc.status = PStatus::Waiting; - self.phs.write().unwrap().insert(id, p_handle.clone()); + pub fn save(&self) -> std::io::Result<()> { + let file = std::fs::File::create(&self.path)?; + serde_json::to_writer(file, self)?; + Ok(()) + } - if let Some(ref g) = task_group { - let mut group_map = self.group_map.write().unwrap(); - if let Some(group) = group_map.get_mut(g) { - group.phs.insert(id, p_handle.clone()); - } else { - group_map.insert( - g.clone(), - GroupMonitor { - phs: HashMap::from_iter([(id, p_handle.clone())]), - status: GEventStatus::Started, - }, - ); - } + fn load(path: PathBuf) -> std::io::Result { + let file = std::fs::File::open(&path)?; + let mut desc: Self = serde_json::from_reader(file)?; + desc.path = path; + Ok(desc) + } + + fn snapshot(&self) -> RuntimeTaskDescSnapshot { + match self.param.clone() { + RuntimeTaskParam::Download(param) => RuntimeTaskDescSnapshot { + state: self.state.clone(), + total: self.total, + current: self.current, + start_at: self.started_at, + created_at: self.created_at, + filename: param.filename.unwrap_or_default(), + dest: param.dest, + }, } + } +} - PEvent::emit_created( - &self.app_handle, - id, - task_group.clone().as_deref(), - p_handle.read().unwrap().desc.clone(), - ); +fn into_runtime_task<'a>( + app_handle: AppHandle, + desc: RuntimeTaskDescRwLock, + group_state: RuntimeGroupStateRwLock, + reporter: &'a mut TaskReporter, +) -> PinnedFuture<'a> { + match desc.read().unwrap().param.clone() { + RuntimeTaskParam::Download(param) => Box::pin(download::into_runtime_task( + app_handle, + group_state, + desc.clone(), + param, + reporter, + )), + } +} - let task = Box::pin(async move { - if p_handle.read().unwrap().desc.status.is_cancelled() { - return Ok(()); - } +pub struct RuntimeTaskHandle { + group_state: RuntimeGroupStateRwLock, + desc: RuntimeTaskDescRwLock, +} - let result = task.await; - let mut p_handle = p_handle.write().unwrap(); +impl RuntimeTaskHandle { + pub fn new(group_state: RuntimeGroupStateRwLock, desc: RuntimeTaskDescRwLock) -> Self { + Self { group_state, desc } + } +} - if let Err(e) = result { - p_handle.mark_failed(e.0); - } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RuntimeStatefulSet +where + T: Clone + Eq + std::hash::Hash, +{ + pending: VecDeque, + failed: HashSet, + completed: HashSet, + stopped: HashSet, + cancelled: HashSet, + running: HashSet, + tracked: usize, +} - Ok(()) - }); +#[derive(Clone)] +pub enum TaskCommand { + Stop, + Resume, + Restart, + Retry, + Cancel, +} - if let Some(ref task_group) = task_group { - GEvent::emit_group_started(&self.app_handle, task_group); +impl RuntimeStatefulSet +where + T: Clone + Eq + std::hash::Hash, +{ + fn new>(id_set: I) -> Self { + let pending = VecDeque::from_iter(id_set); + let tracked = pending.len(); + Self { + pending, + failed: Default::default(), + completed: Default::default(), + stopped: Default::default(), + cancelled: Default::default(), + running: Default::default(), + tracked, } + } - self - .tx - .send_async(SJMCLFuture { - task_id: id, - task_group: task_group.clone(), - f: task, - }) - .await - .unwrap(); - } - - pub async fn enqueue_task_group(&self, task_group: String, futures: Vec) { - let mut hvec: Vec<(u32, Arc>)> = Vec::new(); - - for future in futures.iter() { - future.h.write().unwrap().desc.status = PStatus::Waiting; - self - .phs - .write() - .unwrap() - .insert(future.task_id, future.h.clone()); - PEvent::emit_created( - &self.app_handle, - future.task_id, - Some(task_group.as_ref()), - future.h.read().unwrap().desc.clone(), - ); - hvec.push((future.task_id, future.h.clone())); + fn start_one(&mut self) -> Option { + if let Some(id) = self.pending.pop_front() { + self.running.insert(id.clone()); + Some(id) + } else { + None } + } - self.group_map.write().unwrap().insert( - task_group.clone(), - GroupMonitor { - phs: HashMap::from_iter(hvec), - status: GEventStatus::Started, - }, - ); - GEvent::emit_group_started(&self.app_handle, &task_group); + fn complete_one(&mut self, id: T) { + let _ = self.running.remove(&id); + self.completed.insert(id); + } - for future in futures { - let task = Box::pin(async move { - if future.h.read().unwrap().desc.status.is_cancelled() { - return Ok(()); - } - let result = future.f.await; - let mut p_handle = future.h.write().unwrap(); - if let Err(e) = result { - p_handle.mark_failed(e.0); - } - Ok(()) - }); - self - .tx - .send_async(SJMCLFuture { - task_id: future.task_id, - task_group: Some(task_group.clone()), - f: task, - }) - .await - .unwrap(); + fn fail_one(&mut self, id: T) { + let _ = self.running.remove(&id); + self.failed.insert(id); + } + + fn stop_one(&mut self, id: T) { + if let Some(pos) = self.pending.iter().position(|x| *x == id) { + self.stopped.insert(self.pending.remove(pos).unwrap()); + } else if let Some(id) = self.running.take(&id) { + self.stopped.insert(id.clone()); } } - pub async fn background_process(&self) { - loop { - let future = self.rx.recv_async().await.unwrap(); - if self - .phs - .read() - .unwrap() - .get(&future.task_id) - .unwrap() - .read() - .unwrap() - .desc - .status - .is_cancelled() - { - continue; - } - // Check if the task group is stopped before acquiring permit - if let Some(ref task_group) = future.task_group { - let is_stopped = self - .group_map - .read() - .unwrap() - .get(task_group) - .map(|g| g.status == GEventStatus::Stopped) - .unwrap_or(false); - - if is_stopped { - // Store the future in the stopped_futures list and continue to next task - self.stopped_futures.lock().unwrap().push(future); - continue; - } - } + fn stop_all(&mut self) { + self.stopped.extend(self.pending.drain(..)); + self.stopped.extend(self.running.drain()); + } - // Acquire permit before spawning the task - let permit = self.concurrency.clone().acquire_owned().await.unwrap(); - - let tasks = self.tasks.clone(); - let group_map = self.group_map.clone(); - let app = self.app_handle.clone(); - - self.tasks.lock().unwrap().insert( - future.task_id, - tauri::async_runtime::spawn(async move { - // Move the permit into the spawned task - let _permit = permit; - - if let Some(task_group) = future.task_group.clone() { - // Wait for the task group to be resumed if it is stopped - loop { - let is_stopped = group_map - .read() - .unwrap() - .get(&task_group) - .map(|g| g.status == GEventStatus::Stopped) - .unwrap_or(false); - if !is_stopped { - break; - } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - } + fn cancel_all(&mut self) { + self.cancelled.extend(self.pending.drain(..)); + self.cancelled.extend(self.stopped.drain()); + self.cancelled.extend(self.running.drain()); + } - let r = future.f.await; - match r { - Ok(_) => { - if let Some(group_name) = future.task_group { - if let Some(group) = group_map.write().unwrap().get_mut(&group_name) { - group.phs.remove(&future.task_id); - if group.phs.is_empty() { - group.status = GEventStatus::Completed; - GEvent::emit_group_completed(&app, &group_name) - } - } - } - } - Err(e) => { - info!("Task failed: {e:?}"); - if let Some(group_name) = future.task_group { - GEvent::emit_group_failed(&app, &group_name); - if let Some(group) = group_map.write().unwrap().remove(&group_name) { - for (_, handle) in group.phs { - let mut handle = handle.write().unwrap(); - if handle.desc.status.is_waiting() { - handle.mark_cancelled() - } - } - } - } - } - } - tasks.lock().unwrap().remove(&future.task_id); - // The permit will be automatically released when _permit is dropped - }), - ); + fn cancel_one(&mut self, id: T) { + if let Some(pos) = self.pending.iter().position(|x| *x == id) { + self.cancelled.insert(self.pending.remove(pos).unwrap()); } + self.running.remove(&id); + self.stopped.remove(&id); + self.cancelled.insert(id); } - pub fn stop_progress(&self, id: u32) { - if let Some(handle) = self.phs.read().unwrap().get(&id) { - handle.write().unwrap().mark_stopped(); - } + fn resume_all(&mut self) { + self.pending.extend(self.stopped.drain()); } - pub fn resume_progress(&self, id: u32) { - if let Some(handle) = self.phs.read().unwrap().get(&id) { - handle.write().unwrap().mark_resumed(); - } + fn resume_one(&mut self, id: T) { + self.stopped.remove(&id); + self.pending.push_back(id); } - pub fn cancel_progress(&self, id: u32) { - if let Some(p_handle) = self.phs.read().unwrap().get(&id) { - p_handle.write().unwrap().mark_cancelled(); - if let Some(j_handle) = self.tasks.lock().unwrap().remove(&id) { - j_handle.abort(); - } - } + fn retry_one(&mut self, id: T) { + self.cancelled.remove(&id); + self.failed.remove(&id); + self.pending.push_back(id); } - pub async fn restart_progress(&self, id: u32) { - let handle = self.phs.write().unwrap().remove(&id); - if let Some(handle) = handle { - let desc = handle.read().unwrap().desc.clone(); - let task_group = desc.task_group.clone(); - let task_state = desc.status.clone(); - let j_handle = self.tasks.lock().unwrap().remove(&id).unwrap(); - if !task_state.is_completed() { - handle.write().unwrap().mark_cancelled(); - j_handle.abort(); - } - match desc.payload { - PTaskParam::Download(_) => { - let task = DownloadTask::from_descriptor( - self.app_handle.clone(), - desc, - Duration::from_secs(1), - true, - ); - let (f, new_h) = task - .future(self.app_handle.clone(), self.download_rate_limiter.clone()) - .await - .unwrap(); - self.enqueue_task(id, task_group, f, new_h).await; - } + fn retry_all(&mut self) { + self.pending.extend(self.failed.drain()); + self.pending.extend(self.cancelled.drain()) + } + + fn restart_one(&mut self, id: T) { + self.cancelled.remove(&id); + self.stopped.remove(&id); + self.failed.remove(&id); + self.completed.remove(&id); + self.pending.push_back(id); + } + + fn restart_all(&mut self) { + self.pending.extend(self.stopped.drain()); + self.pending.extend(self.failed.drain()); + self.pending.extend(self.completed.drain()); + self.pending.extend(self.cancelled.drain()) + } + + pub fn combine_all_states(&self) -> RuntimeState { + if !self.cancelled.is_empty() { + return RuntimeState::Cancelled; + } + + if !self.stopped.is_empty() { + return RuntimeState::Stopped { + stopped_at: SystemTime::now(), + }; + } + + if !self.failed.is_empty() && self.pending.is_empty() { + return RuntimeState::Failed { + reason: "Some tasks failed".to_string(), + }; + } + + if self.pending.len() == self.tracked { + return RuntimeState::Pending; + } + + if self.completed.len() == self.tracked { + RuntimeState::Completed { + completed_at: SystemTime::now(), } + } else { + RuntimeState::InProgress } } - pub fn create_transient_task(&self, app: AppHandle, mut handle: THandle) { - handle.task_id = self.get_new_id(); - TEvent::new(&handle).emit(&app); - self.ths.write().unwrap().insert(handle.task_id, handle); + fn retry_set(&self) -> Vec { + self + .cancelled + .iter() + .cloned() + .chain(self.failed.iter().cloned()) + .collect() + } + + fn restart_set(&self) -> Vec { + self + .cancelled + .iter() + .cloned() + .chain(self.failed.iter().cloned()) + .chain(self.running.iter().cloned()) + .chain(self.completed.iter().cloned()) + .chain(self.stopped.iter().cloned()) + .collect() } - pub fn set_transient_task(&self, app: AppHandle, task_id: u32, state: String) { - if let Some(desc) = self.ths.write().unwrap().get_mut(&task_id) { - desc.state = state; - TEvent::new(desc).emit(&app); + fn apply_all(&mut self, cmd: TaskCommand) { + match cmd { + TaskCommand::Stop => self.stop_all(), + TaskCommand::Resume => self.resume_all(), + TaskCommand::Restart => self.restart_all(), + TaskCommand::Cancel => self.cancel_all(), + TaskCommand::Retry => self.retry_all(), } } - pub fn cancel_transient_task(&self, task_id: u32) { - self.ths.write().unwrap().remove(&task_id); + fn apply_one(&mut self, id: T, cmd: TaskCommand) { + match cmd { + TaskCommand::Stop => self.stop_one(id), + TaskCommand::Resume => self.resume_one(id), + TaskCommand::Restart => self.restart_one(id), + TaskCommand::Cancel => self.cancel_one(id), + TaskCommand::Retry => self.retry_one(id), + } } +} - pub fn get_transient_task(&self, task_id: u32) -> Option { - self.ths.read().unwrap().get(&task_id).cloned() +impl RuntimeState { + pub fn apply(&mut self, cmd: TaskCommand) { + match cmd { + TaskCommand::Stop => self.set_stopped(), + TaskCommand::Resume => self.set_in_progress(), + TaskCommand::Restart => self.set_pending(), + TaskCommand::Retry => self.set_pending(), + TaskCommand::Cancel => self.set_cancelled(), + } } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct RuntimeGroupDesc { + name: String, + state: RuntimeGroupStateRwLock, + started_at: SystemTime, + created_at: SystemTime, + task_desc_map: HashMap, + stateful_set: RuntimeStatefulSet, +} - pub fn cancel_progressive_task_group(&self, task_group: String) { - if let Some(group) = self.group_map.write().unwrap().remove(&task_group) { - for handle in group.phs.values() { - handle.write().unwrap().mark_cancelled(); - if let Some(join_handle) = self - .tasks - .lock() - .unwrap() - .remove(&handle.read().unwrap().desc.task_id) - { - join_handle.abort(); +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RuntimeTaskDescSnapshot { + state: RuntimeTaskState, + total: u64, + current: u64, + start_at: SystemTime, + created_at: SystemTime, + filename: String, + dest: PathBuf, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RuntimeGroupDescSnapshot { + name: String, + state: RuntimeGroupState, + task_desc_map: HashMap, +} + +impl RuntimeGroupDesc { + fn new(name: String, task_desc_map: HashMap) -> Self { + let id_set: Vec = task_desc_map.keys().copied().collect(); + Self { + name, + state: Arc::new(RwLock::new(RuntimeState::Pending)), + started_at: SystemTime::UNIX_EPOCH, + created_at: SystemTime::now(), + task_desc_map, + stateful_set: RuntimeStatefulSet::new(id_set), + } + } + fn apply(&mut self, handle_map: &mut HashMap>, cmd: TaskCommand) -> bool { + self.state.write().unwrap().apply(cmd.clone()); + let resched = match cmd { + TaskCommand::Restart => { + for id in self.stateful_set.restart_set() { + handle_map.remove(&id); + self + .task_desc_map + .get_mut(&id) + .unwrap() + .write() + .unwrap() + .reset(); + } + true + } + TaskCommand::Retry => { + for id in self.stateful_set.retry_set() { + handle_map.remove(&id); + self + .task_desc_map + .get_mut(&id) + .unwrap() + .write() + .unwrap() + .reset(); } + true } - GEvent::emit_group_cancelled(&self.app_handle, &task_group); + _ => false, + }; + self.stateful_set.apply_all(cmd.clone()); + resched + } + fn snapshot(&self) -> RuntimeGroupDescSnapshot { + let task_desc_map = self + .task_desc_map + .iter() + .map(|(id, desc)| (*id, desc.read().unwrap().snapshot())) + .collect(); + RuntimeGroupDescSnapshot { + name: self.name.clone(), + state: self.state.read().unwrap().clone(), + task_desc_map, } } +} - pub async fn resume_progressive_task_group(&self, task_group: String) { - if let Some(group) = self.group_map.write().unwrap().get_mut(&task_group) { - group.status = GEventStatus::Started; +pub struct RuntimeTaskProgressReader<'a, A> { + reader: A, + handle: RuntimeTaskHandle, + reporter: &'a mut TaskReporter, + interval: Interval, +} - // Resume existing stopped tasks - for handle in group.phs.values() { - if handle.read().unwrap().desc.status.is_stopped() { - handle.write().unwrap().mark_resumed(); - } - } +impl<'a, A> RuntimeTaskProgressReader<'a, A> { + pub fn new( + reader: A, + group_state: RuntimeGroupStateRwLock, + desc: RuntimeTaskDescRwLock, + reporter: &'a mut TaskReporter, + ) -> Self + where + A: AsyncRead + Unpin, + { + let handle = RuntimeTaskHandle::new(group_state, desc); + let interval = tokio::time::interval(Duration::from_secs(1)); + Self { + reader, + handle, + reporter, + interval, } + } +} - // Re-send all stored stopped futures for this task group - let futures_to_resend = { - let mut stopped_futures = self.stopped_futures.lock().unwrap(); - let mut futures = Vec::new(); - - // Extract futures that belong to this task group - let mut i = 0; - while i < stopped_futures.len() { - if stopped_futures[i].task_group.as_ref() == Some(&task_group) { - futures.push(stopped_futures.remove(i)); - } else { - i += 1; - } - } +#[derive(Debug, Error)] +pub enum RuntimeTaskProgressError { + #[error("The task has been stopped externally")] + ExternalInterrupted, +} - futures +impl<'a, A> AsyncRead for RuntimeTaskProgressReader<'a, A> +where + A: AsyncRead + Unpin, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + { + let group_state = this.handle.group_state.read().unwrap(); + let desc = this.handle.desc.read().unwrap(); + if !group_state.pollable() || !desc.state.pollable() { + return Poll::Ready(Err(std::io::Error::other(Box::new( + RuntimeTaskProgressError::ExternalInterrupted, + )))); + } }; + Pin::new(&mut this.reader).poll_read(cx, buf).map_ok(|()| { + let incre = buf.filled().len() as u64; + this.handle.desc.write().unwrap().current += incre; + if this.interval.poll_tick(cx).is_ready() { + let desc = this.handle.desc.read().unwrap(); + desc.save().unwrap(); + this.reporter.report_progress(desc.current as i64) + } + }) + } +} - // Re-send the futures - for future in futures_to_resend { - self.tx.send_async(future).await.unwrap(); - } +pub struct IdGenerator { + seq: AtomicU32, +} - GEvent::emit_group_started(&self.app_handle, &task_group); +impl Default for IdGenerator { + fn default() -> Self { + Self { seq: 0.into() } } +} - pub fn stop_progressive_task_group(&self, task_group: String) { - if let Some(group) = self.group_map.write().unwrap().get_mut(&task_group) { - group.status = GEventStatus::Stopped; - for handle in group.phs.values() { - let status = handle.read().unwrap().desc.status.clone(); - if status.is_in_progress() || status.is_waiting() { - handle.write().unwrap().mark_stopped(); - } +impl IdGenerator { + fn next_id(&self) -> u32 { + self.seq.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } +} + +pub struct TaskMonitor { + app_handle: tauri::AppHandle, + group_descs: ConcurrentHashMap, + inqueue: ConcurrentHashSet, + handle_map: ConcurrentHashMap>, + tx: Sender, + rx: Receiver, + sema: Arc, + id_gen: IdGenerator, + report_period: Duration, +} + +impl TaskMonitor { + pub fn new(app_handle: AppHandle) -> Self { + let (tx, rx) = flume::unbounded(); + let config = retrieve_launcher_config(app_handle.clone()).unwrap(); + let concurrency = usize::min( + if config.download.transmission.auto_concurrent { + std::thread::available_parallelism().unwrap().into() + } else { + config.download.transmission.concurrent_count + }, + 8, + ); + Self { + app_handle, + group_descs: Default::default(), + inqueue: Default::default(), + handle_map: Default::default(), + tx, + rx, + sema: Arc::new(Semaphore::new(concurrency)), + id_gen: IdGenerator::default(), + report_period: Duration::from_secs(1), + } + } + pub async fn schedule_task_group( + &self, + group_name: String, + params: Vec, + ) -> RuntimeGroupDescSnapshot { + let task_desc_map: HashMap = + HashMap::from_iter(params.into_iter().map(|param| { + let task_id = self.id_gen.next_id(); + let task_desc = match param { + RuntimeTaskParam::Download(mut param) => { + if param.filename.is_none() { + param.filename = Some(extract_filename( + param.dest.to_str().unwrap_or_default(), + true, + )); + } + Arc::new(RwLock::new(RuntimeTaskDesc::new( + self.app_handle.clone(), + task_id, + Some(group_name.clone()), + RuntimeTaskParam::Download(param), + ))) + } + }; + (task_id, task_desc) + })); + let group_desc = RuntimeGroupDesc::new(group_name.clone(), task_desc_map); + let snapshot = group_desc.snapshot(); + self + .group_descs + .lock() + .unwrap() + .insert(group_name.clone(), Arc::new(RwLock::new(group_desc))); + self.inqueue.lock().unwrap().insert(group_name.clone()); + self.tx.send_async(group_name.clone()).await.unwrap(); + let group_reporter = GroupReporter::new(self.app_handle.clone(), group_name.clone()); + group_reporter.report(&RuntimeGroupState::Pending); + snapshot + } + + pub async fn background_process(&self) { + loop { + let group_name = self.rx.recv_async().await.unwrap(); + let group_desc = self + .group_descs + .lock() + .unwrap() + .get(&group_name) + .unwrap() + .clone(); + let group_reporter = GroupReporter::new(self.app_handle.clone(), group_name.clone()); + self.inqueue.lock().unwrap().remove(&group_name); + loop { + let (task_desc, group_state, task_id) = { + let mut group_desc = group_desc.write().unwrap(); + let state = group_desc.state.clone(); + let mut group_state = state.write().unwrap(); + + if !group_state.pollable() { + break; + } + + if group_state.is_pending() { + group_state.set_in_progress(); + group_reporter.report(&group_state); + } + + let task_id_option = group_desc.stateful_set.start_one(); + + if task_id_option.is_none() { + break; + } + + let task_id = task_id_option.unwrap(); + ( + group_desc.task_desc_map.get(&task_id).unwrap().clone(), + state.clone(), + task_id, + ) + }; + + let handle = { + let permit = self.sema.clone().acquire_owned().await.unwrap(); + let app_handle = self.app_handle.clone(); + let report_period = self.report_period; + let task_group_state = group_desc.read().unwrap().state.clone(); + let task_group_desc = group_desc.clone(); + let task_handle_map = self.handle_map.clone(); + let task_group_reporter = group_reporter.clone(); + + tokio::spawn(async move { + // Create Task Reporter for this task + let mut task_reporter = { + let desc = task_desc.read().unwrap(); + let group_name = desc.group.clone(); + TaskReporter::from_desc_interval( + app_handle.clone(), + group_name, + &desc, + &report_period, + ) + }; + // Create the runtime task, erase the type for dispatching. + let task = into_runtime_task( + app_handle, + task_desc.clone(), + group_state, + &mut task_reporter, + ); + let _permit = permit; + let r = task.await; + { + let mut group_desc = task_group_desc.write().unwrap(); + let mut group_state = task_group_state.write().unwrap(); + let mut task_desc = task_desc.write().unwrap(); + match r { + Ok(_) => { + group_desc.stateful_set.complete_one(task_id); + task_desc.state.set_completed(); + } + Err(e) => { + let reason = e.to_string(); + if e.downcast::().is_err() { + group_desc.stateful_set.fail_one(task_id); + task_desc.state.set_failed(reason.clone()); + *group_state = task_desc.state.clone(); + } else { + task_desc.state = group_state.clone(); + } + } + } + task_reporter.report(&task_desc); + task_desc.save().unwrap(); + if group_state.pollable() { + *group_state = group_desc.stateful_set.combine_all_states(); + } + if !group_state.pollable() { + task_group_reporter.report(&group_state); + } + } + task_handle_map.lock().unwrap().remove(&task_id); + }) + }; + self.handle_map.lock().unwrap().insert(task_id, handle); } - GEvent::emit_group_stopped(&self.app_handle, &task_group); } } - pub fn state_list(&self) -> Vec { - self - .group_map - .read() + pub async fn apply_cmd(&self, group_name: String, cmd: TaskCommand) { + let group_desc_opt = self.group_descs.lock().unwrap().get(&group_name).cloned(); + if group_desc_opt.is_none() { + return; + } + let group_desc = group_desc_opt.unwrap(); + if group_desc + .write() .unwrap() - .iter() - .map(|(k, v)| PTaskGroupDesc { - task_group: k.clone(), - task_descs: v - .phs - .values() - .map(|h| h.read().unwrap().desc.clone()) - .collect(), - status: v.status.clone(), - }) + .apply(&mut self.handle_map.lock().unwrap(), cmd.clone()) + && !self.inqueue.lock().unwrap().contains(&group_name) + { + self.tx.send_async(group_name.clone()).await.unwrap(); + } + let group_reporter = GroupReporter::new(self.app_handle.clone(), group_name); + group_reporter.report(&group_desc.read().unwrap().state.read().unwrap()); + } + + pub fn state_list(&self) -> Vec { + let group_descs = self.group_descs.lock().unwrap(); + group_descs + .values() + .map(|desc| desc.read().unwrap().snapshot()) .collect() } pub fn has_active_download_tasks(&self) -> bool { - let phs = self.phs.read().unwrap(); - for handle in phs.values() { - let desc = handle.read().unwrap(); - let status = &desc.desc.status; - // check if the task is a download task and is in progress or waiting - if matches!(desc.desc.payload, super::PTaskParam::Download(_)) - && (status.is_in_progress() || status.is_waiting()) - { - return true; - } - } - false + !self.handle_map.lock().unwrap().is_empty() } } diff --git a/src-tauri/src/tasks/reporter.rs b/src-tauri/src/tasks/reporter.rs new file mode 100644 index 000000000..68a2cefb2 --- /dev/null +++ b/src-tauri/src/tasks/reporter.rs @@ -0,0 +1,184 @@ +use super::{ + events::{GroupEvent, TaskEvent}, + monitor::{RuntimeGroupState, RuntimeTaskDesc}, +}; +use log::info; +use std::time::Duration; +use tauri::AppHandle; + +pub trait Sink { + fn report_progress( + &self, + task_id: u32, + task_group: Option<&str>, + current: i64, + total: i64, + percentage: u32, + estimated_time: Option, + speed: f64, + ); + fn report_completion(&self, task_id: u32, task_group: Option<&str>); + fn report_stopped(&self, task_id: u32, task_group: Option<&str>); + fn report_cancelled(&self, task_id: u32, task_group: Option<&str>); + fn report_started(&self, task_id: u32, task_group: Option<&str>, total: i64); + fn report_failed(&self, task_id: u32, task_group: Option<&str>, reason: String); +} + +pub struct TaskReporter { + app: AppHandle, + id: u32, + group: Option, + total: i64, + last_reported: i64, + period: Duration, +} + +impl TaskReporter { + pub fn new(app: AppHandle, id: u32, group: Option, total: i64, period: Duration) -> Self { + Self { + app, + id, + group, + total, + last_reported: 0, + period, + } + } + + pub fn set_total(&mut self, total: i64) { + self.total = total; + } + + pub fn from_desc_interval( + app: AppHandle, + group: Option, + desc: &RuntimeTaskDesc, + period: &Duration, + ) -> Self { + Self { + app, + id: desc.id, + group, + total: desc.total as i64, + last_reported: desc.current as i64, + period: *period, + } + } +} + +impl TaskReporter { + pub fn report(&mut self, desc: &RuntimeTaskDesc) { + if desc.current as i64 > self.last_reported { + self.report_progress(desc.current as i64); + } + + match desc.state { + RuntimeGroupState::InProgress => self.report_started(desc.total as i64), + RuntimeGroupState::Cancelled => self.report_canceled(), + RuntimeGroupState::Stopped { .. } => self.report_stopped(), + RuntimeGroupState::Failed { ref reason } => self.report_failed(reason.clone()), + RuntimeGroupState::Completed { .. } => self.report_completion(), + _ => {} + } + } + + pub fn report_started(&mut self, total: i64) { + self.total = total; + TaskEvent::emit_started(&self.app, self.id, self.group.as_deref(), total); + } + pub fn report_stopped(&self) { + TaskEvent::emit_stopped(&self.app, self.id, self.group.as_deref()); + } + + pub fn report_canceled(&self) { + TaskEvent::emit_cancelled(&self.app, self.id, self.group.as_deref()); + } + + pub fn report_completion(&self) { + TaskEvent::emit_completed(&self.app, self.id, self.group.as_deref()); + } + + pub fn report_progress(&mut self, current: i64) { + if current > self.total { + return; + } + let percentage = if self.total > 0 { + (current as f64 / self.total as f64 * 100.0).round() as u32 + } else { + 0 + }; + + let estimated_time = if self.last_reported > 0 && current > self.last_reported { + Some( + (self.total - current) as f64 / (current - self.last_reported) as f64 + * self.period.as_secs_f64(), + ) + } else { + None + }; + let speed = (current - self.last_reported) as f64 / self.period.as_secs_f64(); + + TaskEvent::emit_in_progress( + &self.app, + self.id, + self.group.as_deref(), + percentage as f64 / 100.0, + current, + estimated_time.map(Duration::from_secs_f64), + speed, + ); + + self.last_reported = current; + } + + pub fn report_failed(&self, reason: String) { + TaskEvent::emit_failed(&self.app, self.id, self.group.as_deref(), reason); + } +} + +#[derive(Clone)] +pub struct GroupReporter { + app: AppHandle, + group: String, +} + +impl GroupReporter { + pub fn new(app: AppHandle, group: String) -> Self { + Self { app, group } + } + + pub fn report(&self, state: &RuntimeGroupState) { + match *state { + RuntimeGroupState::Pending => self.report_group_created(), + RuntimeGroupState::InProgress => self.report_group_started(), + RuntimeGroupState::Cancelled => self.report_group_cancelled(), + RuntimeGroupState::Stopped { .. } => self.report_group_stopped(), + RuntimeGroupState::Failed { .. } => self.report_group_failed(), + RuntimeGroupState::Completed { .. } => self.report_group_completed(), + } + } + + fn report_group_created(&self) { + GroupEvent::emit_group_created(&self.app, &self.group); + } + + fn report_group_started(&self) { + GroupEvent::emit_group_started(&self.app, &self.group); + } + + fn report_group_failed(&self) { + GroupEvent::emit_group_failed(&self.app, &self.group); + } + + fn report_group_completed(&self) { + GroupEvent::emit_group_completed(&self.app, &self.group); + } + + fn report_group_cancelled(&self) { + GroupEvent::emit_group_cancelled(&self.app, &self.group); + } + + fn report_group_stopped(&self) { + GroupEvent::emit_group_stopped(&self.app, &self.group); + } +} diff --git a/src-tauri/src/tasks/streams/desc.rs b/src-tauri/src/tasks/streams/desc.rs deleted file mode 100644 index e86f4032a..000000000 --- a/src-tauri/src/tasks/streams/desc.rs +++ /dev/null @@ -1,134 +0,0 @@ -use std::path::PathBuf; - -use serde::{Deserialize, Serialize}; - -use crate::tasks::events::GEventStatus; - -#[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PDesc -where - T: Clone + Serialize, -{ - pub task_id: u32, - pub task_group: Option, - pub total: i64, - pub current: i64, - pub payload: T, - pub status: PStatus, -} - -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq)] -pub enum PStatus { - Waiting, - Stopped, - Cancelled, - Completed, - InProgress, - Failed, -} - -impl PStatus { - pub fn is_terminated(&self) -> bool { - matches!(self, PStatus::Cancelled | PStatus::Completed) - } - - pub fn is_stopped(&self) -> bool { - *self == PStatus::Stopped - } - - pub fn is_completed(&self) -> bool { - *self == PStatus::Completed - } - - pub fn is_in_progress(&self) -> bool { - *self == PStatus::InProgress - } - - pub fn is_cancelled(&self) -> bool { - *self == PStatus::Cancelled - } - - pub fn is_waiting(&self) -> bool { - *self == PStatus::Waiting - } -} - -impl PDesc -where - T: Clone + Serialize + for<'de> Deserialize<'de>, -{ - pub fn new( - task_id: u32, - task_group: Option, - total: i64, - payload: T, - status: PStatus, - ) -> Self { - Self { - task_id, - task_group, - total, - current: 0, - payload, - status, - } - } - pub fn save(&self, path: &PathBuf) -> std::io::Result<()> { - let file = std::fs::File::create(path)?; - serde_json::to_writer(file, self)?; - Ok(()) - } - - pub fn load(path: &PathBuf) -> std::io::Result { - let file = std::fs::File::open(path)?; - let desc: Self = serde_json::from_reader(file)?; - Ok(desc) - } - - pub fn increment_progress(&mut self, size: i64) { - if self.status != PStatus::InProgress { - return; - } - self.current += size; - } - - pub fn start(&mut self) { - self.status = PStatus::InProgress - } - - pub fn stop(&mut self) { - if self.status == PStatus::InProgress || self.status == PStatus::Waiting { - self.status = PStatus::Stopped - } - } - - pub fn cancel(&mut self) { - self.status = PStatus::Cancelled - } - - pub fn resume(&mut self) { - if self.status == PStatus::Stopped { - self.status = PStatus::InProgress - } - } - - pub fn complete(&mut self) { - self.status = PStatus::Completed; - } - - pub fn fail(&mut self) { - self.status = PStatus::Failed; - } -} - -#[derive(Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GDesc -where - T: Clone + Serialize, -{ - pub task_group: String, - pub task_descs: Vec>, - pub status: GEventStatus, -} diff --git a/src-tauri/src/tasks/streams/handle.rs b/src-tauri/src/tasks/streams/handle.rs deleted file mode 100644 index d0309b4d8..000000000 --- a/src-tauri/src/tasks/streams/handle.rs +++ /dev/null @@ -1,119 +0,0 @@ -use super::desc::{PDesc, PStatus}; -use super::reporter::{Reporter, Sink}; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; -use std::task::{Context, Waker}; -use tokio::time::{interval, Duration, Interval}; - -pub struct PHandle -where - S: Sink, - P: Clone + Serialize + for<'de> Deserialize<'de>, -{ - pub interval: Interval, - pub desc: PDesc

, - pub path: PathBuf, - pub reporter: Reporter, - pub waker: Option, -} - -impl PHandle -where - S: Sink, - P: Clone + Serialize + for<'de> Deserialize<'de>, -{ - pub fn new(desc: PDesc

, duration: Duration, path: PathBuf, reporter: Reporter) -> Self { - Self { - interval: interval(duration), - desc, - path, - reporter, - waker: None, - } - } - - pub fn mark_stopped(&mut self) { - self.desc.stop(); - self.desc.save(&self.path).unwrap(); - self - .reporter - .report_stopped(self.desc.task_id, self.desc.task_group.as_deref()); - } - - pub fn mark_resumed(&mut self) { - self.desc.resume(); - self.desc.save(&self.path).unwrap(); - self.reporter.report_started( - self.desc.task_id, - self.desc.task_group.as_deref(), - self.desc.total, - ); - - // Wake up any waiting poll_next calls - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } - - pub fn mark_cancelled(&mut self) { - self.desc.cancel(); - self.desc.save(&self.path).unwrap(); - self - .reporter - .report_cancelled(self.desc.task_id, self.desc.task_group.as_deref()); - } - - pub fn mark_completed(&mut self) { - self.desc.complete(); - self.desc.save(&self.path).unwrap(); - self - .reporter - .report_completion(self.desc.task_id, self.desc.task_group.as_deref()); - } - - pub fn mark_started(&mut self) { - self.desc.start(); - self.desc.save(&self.path).unwrap(); - self.reporter.report_started( - self.desc.task_id, - self.desc.task_group.as_deref(), - self.desc.total, - ); - } - - pub fn mark_failed(&mut self, reason: String) { - self.desc.fail(); - self.desc.save(&self.path).unwrap(); - self - .reporter - .report_failed(self.desc.task_id, self.desc.task_group.as_deref(), reason); - } - - pub fn status(&self) -> &PStatus { - &self.desc.status - } - - pub fn store_waker(&mut self, waker: Waker) { - self.waker = Some(waker); - } - - pub fn set_total(&mut self, total: i64) { - if total > self.desc.total { - self.desc.total = total; - self.desc.save(&self.path).unwrap(); - self.reporter.set_total(total); - } - } - - pub fn report_progress(&mut self, cx: &mut Context<'_>, incr: i64) { - self.desc.increment_progress(incr); - if self.interval.poll_tick(cx).is_ready() { - self.desc.save(&self.path).unwrap(); - self.reporter.report_progress( - self.desc.task_id, - self.desc.task_group.as_deref(), - self.desc.current, - ); - } - } -} diff --git a/src-tauri/src/tasks/streams/mod.rs b/src-tauri/src/tasks/streams/mod.rs deleted file mode 100644 index a9d4cbf14..000000000 --- a/src-tauri/src/tasks/streams/mod.rs +++ /dev/null @@ -1,92 +0,0 @@ -pub mod desc; -mod handle; -pub mod reporter; -mod unit; - -use futures::stream::FusedStream; -use futures::Stream; -use pin_project::pin_project; -use serde::{Deserialize, Serialize}; -use std::pin::Pin; -use std::sync::{Arc, RwLock}; -use std::task::{Context, Poll}; -use unit::Unit; - -pub use desc::{GDesc, PDesc}; -pub use handle::PHandle; -use reporter::*; - -#[pin_project] -pub struct ProgressStream -where - M: Stream, - U: Unit, - S: Sink, - P: Clone + Serialize + for<'de> Deserialize<'de>, -{ - #[pin] - stream: M, - handle: Arc>>, -} - -impl ProgressStream -where - M: Stream, - U: Unit, - S: Sink, - P: Clone + Serialize + for<'de> Deserialize<'de>, -{ - pub fn new(stream: M, handle: Arc>>) -> Self { - Self { stream, handle } - } -} - -impl Stream for ProgressStream -where - M: Stream, - U: Unit, - S: Sink, - P: Clone + Serialize + for<'de> Deserialize<'de>, -{ - type Item = U; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - { - let mut h = self.handle.write().unwrap(); - let state = h.status(); - if state.is_stopped() { - // Store the waker so we can be woken up when resumed - h.store_waker(cx.waker().clone()); - return Poll::Pending; - } - - if !state.is_in_progress() { - return Poll::Ready(None); - } - } - - let p = self.project(); - - p.stream.poll_next(cx).map(|opt| { - let mut h = p.handle.write().unwrap(); - if let Some(item) = &opt { - h.report_progress(cx, item.unit_size()); - } else { - h.mark_completed(); - } - opt - }) - } -} - -impl FusedStream for ProgressStream -where - M: Stream, - U: Unit, - S: Sink, - P: Clone + Serialize + for<'de> Deserialize<'de>, -{ - fn is_terminated(&self) -> bool { - self.handle.read().unwrap().status().is_terminated() - } -} diff --git a/src-tauri/src/tasks/streams/reporter.rs b/src-tauri/src/tasks/streams/reporter.rs deleted file mode 100644 index d2234c8ad..000000000 --- a/src-tauri/src/tasks/streams/reporter.rs +++ /dev/null @@ -1,124 +0,0 @@ -use super::desc::PDesc; -use serde::{Deserialize, Serialize}; -use std::time::Duration; -use tauri::AppHandle; - -pub trait Sink { - fn report_progress( - &self, - task_id: u32, - task_group: Option<&str>, - current: i64, - total: i64, - percentage: u32, - estimated_time: Option, - speed: f64, - ); - fn report_completion(&self, task_id: u32, task_group: Option<&str>); - fn report_stopped(&self, task_id: u32, task_group: Option<&str>); - fn report_cancelled(&self, task_id: u32, task_group: Option<&str>); - fn report_started(&self, task_id: u32, task_group: Option<&str>, total: i64); - fn report_failed(&self, task_id: u32, task_group: Option<&str>, reason: String); -} - -pub struct Reporter { - total: i64, - last_reported: i64, - interval: Duration, - sink: S, -} - -impl Reporter -where - S: Sink, -{ - pub fn new(total: i64, interval: Duration, sink: S) -> Self { - Self { - total, - last_reported: 0, - interval, - sink, - } - } - - pub fn set_total(&mut self, total: i64) { - self.total = total; - } - - pub fn from_desc_interval Deserialize<'de>>( - desc: &PDesc, - interval: &Duration, - sink: S, - ) -> Self { - Self { - total: desc.total, - last_reported: desc.current, - interval: *interval, - sink, - } - } -} - -impl Reporter -where - S: Sink, -{ - pub fn report_started(&self, task_id: u32, task_group: Option<&str>, total: i64) { - self.sink.report_started(task_id, task_group, total); - } - - pub fn report_stopped(&self, task_id: u32, task_group: Option<&str>) { - self.sink.report_stopped(task_id, task_group); - } - - pub fn report_cancelled(&self, task_id: u32, task_group: Option<&str>) { - self.sink.report_cancelled(task_id, task_group); - } - - pub fn report_completion(&self, task_id: u32, task_group: Option<&str>) { - self.sink.report_completion(task_id, task_group); - } - - pub fn report_progress(&mut self, task_id: u32, task_group: Option<&str>, current: i64) { - let percentage = if self.total > 0 { - (current as f64 / self.total as f64 * 100.0).round() as u32 - } else { - 0 - }; - - let estimated_time = if self.last_reported > 0 && current > self.last_reported { - Some( - (self.total - current) as f64 / (current - self.last_reported) as f64 - * self.interval.as_secs_f64(), - ) - } else { - None - }; - - let speed = (current - self.last_reported) as f64 / self.interval.as_secs_f64(); - - self.sink.report_progress( - task_id, - task_group, - current, - self.total, - percentage, - estimated_time, - speed, - ); - - self.last_reported = current; - } - - pub fn report_failed(&self, task_id: u32, task_group: Option<&str>, reason: String) { - self.sink.report_failed(task_id, task_group, reason); - } -} - -pub struct EventPayload {} - -pub struct EventReporter { - app: AppHandle, - task_id: u32, - total: i64, -} diff --git a/src-tauri/src/tasks/streams/unit.rs b/src-tauri/src/tasks/streams/unit.rs deleted file mode 100644 index 2a6d649a9..000000000 --- a/src-tauri/src/tasks/streams/unit.rs +++ /dev/null @@ -1,41 +0,0 @@ -use tokio_util::bytes::Bytes; - -pub trait Unit { - fn unit_size(&self) -> i64; -} - -impl Unit for &T { - fn unit_size(&self) -> i64 { - 1 - } -} - -impl Unit for Bytes { - fn unit_size(&self) -> i64 { - self.len() as i64 - } -} - -impl Unit for Result -where - T: Unit, -{ - fn unit_size(&self) -> i64 { - match self { - Ok(b) => (*b).unit_size(), - Err(_) => 0, - } - } -} - -impl Unit for Option -where - T: Unit, -{ - fn unit_size(&self) -> i64 { - match self { - Some(b) => (*b).unit_size(), - None => 0, - } - } -} diff --git a/src-tauri/src/utils/web.rs b/src-tauri/src/utils/web.rs index 86711111d..b73d30d2a 100644 --- a/src-tauri/src/utils/web.rs +++ b/src-tauri/src/utils/web.rs @@ -12,58 +12,60 @@ use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use std::sync::Mutex; use std::time::Duration; -/// Builds a reqwest client with SJMCL version header and proxy support. -/// Defaults to 10s timeout. -/// -/// # Arguments -/// -/// * `app` - The Tauri AppHandle. -/// * `use_version_header` - Whether to include the SJMCL version header. -/// * `use_proxy` - Whether to use the proxy settings from the config. -/// -/// TODO: support more custom config from reqwest::Config -/// FIXME: Seems like hyper will panic if this client is shared across threads. -/// -/// # Returns -/// -/// A reqwest::Client instance. -/// -/// # Example -/// -/// ```rust -/// let client = build_sjmcl_client(&app, true, true); -/// ``` -pub fn build_sjmcl_client(app: &AppHandle, use_version_header: bool, use_proxy: bool) -> Client { - let mut builder = ClientBuilder::new() - .timeout(Duration::from_secs(10)) - .tcp_keepalive(Duration::from_secs(10)); +const CONNECT_TIMEOUT: Duration = Duration::from_secs(60); +const READ_TIMEOUT: Duration = Duration::from_secs(60); +const RETRY_CEILING: Duration = Duration::from_secs(3600); - if let Ok(config) = app.state::>().lock() { - if use_version_header { - // According to the User-Agent requirements of mozilla and BMCLAPI, the User-Agent is set to start with ${NAME}/${VERSION} - // https://github.com/MCLF-CN/docs/issues/2 - // https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Reference/Headers/User-Agent - if let Ok(header_value) = format!("SJMCL/{}", &config.basic_info.launcher_version).parse() { - let mut headers = HeaderMap::new(); - headers.insert("User-Agent", header_value); - builder = builder.default_headers(headers); - } +pub struct WebConfig { + connect_timeout: Duration, + read_timeout: Duration, + use_version_header: bool, + use_proxy: bool, +} + +/// Builds a reqwest client with SJMCL version header and proxy support. +/// Default timeout is 60 seconds for both connect and read. +impl Default for WebConfig { + fn default() -> Self { + WebConfig { + connect_timeout: CONNECT_TIMEOUT, + read_timeout: READ_TIMEOUT, + use_version_header: true, + use_proxy: false, } + } +} - if use_proxy && config.download.proxy.enabled { - let proxy_cfg = &config.download.proxy; - let proxy_url = match proxy_cfg.selected_type { - ProxyType::Http => format!("http://{}:{}", proxy_cfg.host, proxy_cfg.port), - ProxyType::Socks => format!("socks5h://{}:{}", proxy_cfg.host, proxy_cfg.port), - }; +impl WebConfig { + pub fn build(self, app: &AppHandle) -> Client { + let mut builder = ClientBuilder::new() + .connect_timeout(self.connect_timeout) + .read_timeout(self.read_timeout); + if let Ok(config) = app.state::>().lock() { + if self.use_version_header { + // According to the User-Agent requirements of mozilla and BMCLAPI, the User-Agent is set to start with ${NAME}/${VERSION} + // https://github.com/MCLF-CN/docs/issues/2 + // https://developer.mozilla.org/zh-CN/docs/Web/HTTP/Reference/Headers/User-Agent + if let Ok(header_value) = format!("SJMCL/{}", &config.basic_info.launcher_version).parse() { + let mut headers = HeaderMap::new(); + headers.insert("User-Agent", header_value); + builder = builder.default_headers(headers); + } + } + if self.use_proxy && config.download.proxy.enabled { + let proxy_cfg = &config.download.proxy; + let proxy_url = match proxy_cfg.selected_type { + ProxyType::Http => format!("http://{}:{}", proxy_cfg.host, proxy_cfg.port), + ProxyType::Socks => format!("socks5h://{}:{}", proxy_cfg.host, proxy_cfg.port), + }; - if let Ok(proxy) = Proxy::all(&proxy_url) { - builder = builder.proxy(proxy); + if let Ok(proxy) = Proxy::all(&proxy_url) { + builder = builder.proxy(proxy); + } } } + builder.build().unwrap_or_else(|_| Client::new()) } - - builder.build().unwrap_or_else(|_| Client::new()) } struct SJMCLRetryableStrategy; @@ -91,7 +93,7 @@ impl RetryableStrategy for SJMCLRetryableStrategy { pub fn with_retry(client: Client) -> ClientWithMiddleware { ClientWithMiddlewareBuilder::new(client) .with(RetryTransientMiddleware::new_with_policy_and_strategy( - ExponentialBackoff::builder().build_with_total_retry_duration(Duration::from_secs(3600)), + ExponentialBackoff::builder().build_with_total_retry_duration(RETRY_CEILING), SJMCLRetryableStrategy {}, )) .build() diff --git a/src/components/download-indicator.tsx b/src/components/download-indicator.tsx index 3e2164f42..e0577992f 100644 --- a/src/components/download-indicator.tsx +++ b/src/components/download-indicator.tsx @@ -13,7 +13,7 @@ import { useTranslation } from "react-i18next"; import { LuArrowDownToLine, LuCheck, LuCircleAlert } from "react-icons/lu"; import { useLauncherConfig } from "@/contexts/config"; import { useTaskContext } from "@/contexts/task"; -import { GTaskEventStatusEnums } from "@/models/task"; +import { RuntimeStateEnums } from "@/models/task"; export const DownloadIndicator: React.FC = () => { const router = useRouter(); @@ -27,13 +27,12 @@ export const DownloadIndicator: React.FC = () => { const isAllCompleted = tasks.every( (group) => - group.status === GTaskEventStatusEnums.Completed || - group.status === GTaskEventStatusEnums.Cancelled - ) && - tasks.some((group) => group.status === GTaskEventStatusEnums.Completed); + group.status === RuntimeStateEnums.Completed || + group.status === RuntimeStateEnums.Cancelled + ) && tasks.some((group) => group.status === RuntimeStateEnums.Completed); const hasError = tasks.some( - (task) => task.status === GTaskEventStatusEnums.Failed + (task) => task.status === RuntimeStateEnums.Failed ); useEffect(() => { diff --git a/src/components/modals/download-specific-resource-modal.tsx b/src/components/modals/download-specific-resource-modal.tsx index 7fea702e0..f73990585 100644 --- a/src/components/modals/download-specific-resource-modal.tsx +++ b/src/components/modals/download-specific-resource-modal.tsx @@ -58,7 +58,7 @@ import { OtherResourceInfo, OtherResourceVersionPack, } from "@/models/resource"; -import { TaskParam, TaskTypeEnums } from "@/models/task"; +import { RuntimeTaskParam, TaskTypeEnums } from "@/models/task"; import { InstanceService } from "@/services/instance"; import { ResourceService } from "@/services/resource"; import { TaskService } from "@/services/task"; @@ -129,7 +129,7 @@ const DownloadSpecificResourceModal: React.FC< }; const handleScheduleProgressiveTaskGroup = useCallback( - (taskGroup: string, params: TaskParam[]) => { + (taskGroup: string, params: RuntimeTaskParam[]) => { TaskService.scheduleProgressiveTaskGroup(taskGroup, params).then( (response) => { // success toast will now be called by task context group listener @@ -243,7 +243,7 @@ const DownloadSpecificResourceModal: React.FC< src: item.downloadUrl, dest: savepath, sha1: item.sha1, - taskType: TaskTypeEnums.Download, + type: TaskTypeEnums.Download, }, ]); @@ -604,7 +604,7 @@ const DownloadSpecificResourceModal: React.FC< )} - {resource.mcmodId && ( + {!!resource.mcmodId && ( void; handleCancelProgressiveTaskGroup: (taskGroup: string) => void; handleResumeProgressiveTaskGroup: (taskGroup: string) => void; handleStopProgressiveTaskGroup: (taskGroup: string) => void; + handleRetryProgressiveTaskGroup: (taskGroup: string) => void; } export const TaskContext = createContext( @@ -55,19 +56,13 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ const { openSharedModal } = useSharedModals(); const [tasks, setTasks] = useState([]); const [generalPercent, setGeneralPercent] = useState(); + const [needRefresh, setNeedRefresh] = useState(true); const { t } = useTranslation(); const loadingToastRef = React.useRef(null); - const updateGroupInfo = useCallback((group: TaskGroupDesc) => { - if (group.status === GTaskEventStatusEnums.Completed) { - group.taskDescs.forEach((t) => { - t.status = TaskDescStatusEnums.Completed; - t.current = t.total; // Ensure current is set to total for completed tasks - }); - } - + const updateGroupDesc = useCallback((group: TaskGroupDesc) => { group.finishedCount = group.taskDescs.filter( - (t) => t.status === TaskDescStatusEnums.Completed + (t) => t.status === RuntimeStateEnums.Completed ).length; let knownTotalArr = group.taskDescs.filter((t) => t.total && t.total > 0); @@ -90,7 +85,7 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ group.estimatedTime = undefined; group.taskDescs.forEach((t) => { - if (t.status === TaskDescStatusEnums.InProgress && t.estimatedTime) { + if (t.status === RuntimeStateEnums.InProgress && t.estimatedTime) { if ( !group.estimatedTime || group.estimatedTime.secs < t.estimatedTime.secs @@ -103,14 +98,14 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ group.taskDescs.sort((a, b) => { let level = (desc: TaskDesc) => { switch (desc.status) { - case TaskDescStatusEnums.Failed: + case RuntimeStateEnums.Failed: return 0; - case TaskDescStatusEnums.InProgress: + case RuntimeStateEnums.InProgress: return 1; - case TaskDescStatusEnums.Waiting: + case RuntimeStateEnums.Pending: return 2; - case TaskDescStatusEnums.Completed: - return 4; + case RuntimeStateEnums.Completed: + return 999; default: return 3; } @@ -119,31 +114,52 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ }); }, []); + const convertGroupSnapshotToDesc = ( + snapshot: RuntimeGroupDescSnapshot + ): TaskGroupDesc => { + return { + taskGroup: snapshot.name, + taskDescs: Object.entries(snapshot.taskDescMap).map( + ([id, desc]): TaskDesc => ({ + taskId: parseInt(id), + status: desc.state.type, + total: desc.total, + current: desc.current, + startAt: desc.startAt, + createdAt: desc.createdAt, + filename: desc.filename, + dest: desc.dest, + progress: (desc.current * 100) / desc.total, + reason: + desc.state.type === RuntimeStateEnums.Failed + ? desc.state.reason + : undefined, + }) + ), + status: snapshot.state.type, + finishedCount: 0, + progress: 0, + }; + }; + const handleRetrieveProgressTasks = useCallback(() => { TaskService.retrieveProgressiveTaskList().then((response) => { if (response.status === "success") { console.log("Retrieved progressive tasks:", response.data); // info(JSON.stringify(response.data)); - setTasks((prevTasks) => { - let tasks = response.data - .map((group) => { - let prevGroup = prevTasks?.find( - (t) => t.taskGroup === group.taskGroup - ); - if (prevGroup) return prevGroup; - updateGroupInfo(group); - return group; - }) - .filter( - (group) => group.status !== GTaskEventStatusEnums.Cancelled - ); - tasks.sort((a, b) => { - let { timestamp: aTime } = parseTaskGroup(a.taskGroup); - let { timestamp: bTime } = parseTaskGroup(b.taskGroup); - return bTime - aTime; // Sort by timestamp descending - }); - return tasks; + let newTasks = response.data + .map((snapshot) => { + let groupDesc = convertGroupSnapshotToDesc(snapshot); + updateGroupDesc(groupDesc); + return groupDesc; + }) + .filter((group) => group.status !== RuntimeStateEnums.Cancelled); + newTasks.sort((a, b) => { + let { timestamp: aTime } = parseTaskGroup(a.taskGroup); + let { timestamp: bTime } = parseTaskGroup(b.taskGroup); + return bTime - aTime; // Sort by timestamp descending }); + setTasks(newTasks); } else { toast({ title: response.message, @@ -152,18 +168,27 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ }); } }); - }, [toast, updateGroupInfo]); + }, [toast, updateGroupDesc]); useEffect(() => { - handleRetrieveProgressTasks(); - }, [handleRetrieveProgressTasks]); + if (needRefresh) { + handleRetrieveProgressTasks(); + setNeedRefresh(false); + } + }, [handleRetrieveProgressTasks, needRefresh]); const handleScheduleProgressiveTaskGroup = useCallback( - (taskGroup: string, params: TaskParam[]) => { + (taskGroup: string, params: RuntimeTaskParam[]) => { TaskService.scheduleProgressiveTaskGroup(taskGroup, params).then( (response) => { - // success toast will now be called by task context group listener - if (response.status !== "success") { + if (response.status === "success") { + console.log("Scheduled progressive tasks:", response.data); + let groupDesc = convertGroupSnapshotToDesc(response.data); + updateGroupDesc(groupDesc); + setTasks((prevTasks) => { + return [groupDesc, ...prevTasks]; + }); + } else { toast({ title: response.message, description: response.details, @@ -173,7 +198,7 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ } ); }, - [toast] + [toast, updateGroupDesc] ); const handleCancelProgressiveTaskGroup = useCallback( @@ -221,151 +246,121 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ [toast] ); + const handleRetryProgressiveTaskGroup = useCallback( + (taskGroup: string) => { + TaskService.retryProgressiveTaskGroup(taskGroup).then((response) => { + if (response.status !== "success") { + toast({ + title: response.message, + description: response.details, + status: "error", + }); + } + }); + }, + [toast] + ); + useEffect(() => { const unlisten = TaskService.onProgressiveTaskUpdate( - (payload: PTaskEventPayload) => { - // info( - // `Received task update: ${payload.id}, status: ${payload.event.status}` - // ); + (payload: TaskEvent) => { + console.log( + `Received task update: ${payload.id}, status: ${payload.event.status}` + ); setTasks((prevTasks) => { const group = prevTasks?.find( (t) => t.taskGroup === payload.taskGroup ); - switch (payload.event.status) { - case PTaskEventStatusEnums.Created: { - if (group) { - if (group.taskDescs.some((t) => t.taskId === payload.id)) { - // info( - // `Task ${payload.id} already exists in group ${payload.taskGroup}` - // ); - } else if ( - group.taskDescs.some( - (t) => - t.payload.dest === - (payload.event as CreatedPTaskEventStatus).desc.payload - .dest - ) - ) { - // It' a retrial task emitted from the backend - group.taskDescs = group.taskDescs.map((t) => { - if ( - t.payload.dest === - (payload.event as CreatedPTaskEventStatus).desc.payload - .dest - ) { - t = (payload.event as CreatedPTaskEventStatus).desc; - } - return t; - }); - } else { - group.taskDescs.unshift(payload.event.desc); - // info(`Added task ${payload.id} to group ${payload.taskGroup}`); - updateGroupInfo(group); - } - } else { - // info(`Creating new task group ${payload.taskGroup}`); - // Create a new task group if it doesn't exist - let newGroup: TaskGroupDesc = { - status: GTaskEventStatusEnums.Started, - taskGroup: payload.taskGroup, - taskDescs: [payload.event.desc], - }; - updateGroupInfo(newGroup); - return [newGroup, ...(prevTasks || [])]; - } - break; - } + if (!group) { + setNeedRefresh(true); + return prevTasks; + } - case PTaskEventStatusEnums.Started: { - if (!group) return prevTasks; + switch (payload.event.status) { + case TaskEventPayloadEnums.Started: { group.taskDescs = group.taskDescs.map((t) => { if (t.taskId === payload.id) { - t.status = TaskDescStatusEnums.InProgress; - t.total = (payload.event as StartedPTaskEventStatus).total; + t.status = RuntimeStateEnums.InProgress; + t.total = (payload.event as StartedTaskEventPayload).total; } return t; }); - updateGroupInfo(group); + updateGroupDesc(group); break; } - case PTaskEventStatusEnums.Completed: { - if (!group) return prevTasks; + case TaskEventPayloadEnums.Completed: { group.taskDescs = group.taskDescs.map((t) => { if (t.taskId === payload.id) { - t.status = TaskDescStatusEnums.Completed; + t.status = RuntimeStateEnums.Completed; t.current = t.total; } return t; }); // info(`Task ${payload.id} completed in group ${payload.taskGroup}`); - updateGroupInfo(group); + updateGroupDesc(group); break; } - case PTaskEventStatusEnums.Stopped: { - if (!group) return prevTasks; + case TaskEventPayloadEnums.Stopped: { group.taskDescs = group.taskDescs.map((t) => { if (t.taskId === payload.id) { - t.status = TaskDescStatusEnums.Stopped; + t.status = RuntimeStateEnums.Stopped; } return t; }); - updateGroupInfo(group); + updateGroupDesc(group); break; } - case PTaskEventStatusEnums.Cancelled: { - if (!group) return prevTasks; + case TaskEventPayloadEnums.Cancelled: { group.taskDescs = group.taskDescs.map((t) => { if (t.taskId === payload.id) { - t.status = TaskDescStatusEnums.Cancelled; + t.status = RuntimeStateEnums.Cancelled; } return t; }); - updateGroupInfo(group); + updateGroupDesc(group); // info(`Task ${payload.id} cancelled in group ${payload.taskGroup}`); break; } - case PTaskEventStatusEnums.InProgress: { - if (!group) return prevTasks; + case TaskEventPayloadEnums.InProgress: { group.taskDescs = group.taskDescs.map((t) => { if (t.taskId === payload.id) { t.current = ( - payload.event as InProgressPTaskEventStatus + payload.event as InProgressTaskEventPayload ).current; - t.status = TaskDescStatusEnums.InProgress; + t.status = RuntimeStateEnums.InProgress; t.estimatedTime = ( - payload.event as InProgressPTaskEventStatus + payload.event as InProgressTaskEventPayload ).estimatedTime; - t.speed = (payload.event as InProgressPTaskEventStatus).speed; + t.speed = (payload.event as InProgressTaskEventPayload).speed; } return t; }); - updateGroupInfo(group); + updateGroupDesc(group); // info( // `Task ${payload.id} in progress in group ${payload.taskGroup}` // ); break; } - case PTaskEventStatusEnums.Failed: { + case TaskEventPayloadEnums.Failed: { console.error( `Task ${payload.id} failed in group ${payload.taskGroup}: ${ - (payload.event as FailedPTaskEventStatus).reason + (payload.event as FailedTaskEventPayload).reason }` ); - if (!group) return prevTasks; group.taskDescs = group.taskDescs.map((t) => { if (t.taskId === payload.id) { - t.status = TaskDescStatusEnums.Failed; - t.reason = (payload.event as FailedPTaskEventStatus).reason; + t.status = RuntimeStateEnums.Failed; + t.reason = (payload.event as FailedTaskEventPayload).reason; } return t; }); - updateGroupInfo(group); + updateGroupDesc(group); // info(`Task ${payload.id} failed in group ${payload.taskGroup}`); break; } @@ -382,139 +377,157 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ return () => { unlisten(); }; - }, [t, toast, updateGroupInfo]); + }, [t, toast, updateGroupDesc]); useEffect(() => { - const unlisten = TaskService.onTaskGroupUpdate( - (payload: GTaskEventPayload) => { - console.log(`Received task group update: ${payload.event}`); - setTasks((prevTasks) => { - let newTasks = prevTasks.map((task) => { - if (task.taskGroup === payload.taskGroup) { - task.status = payload.event; - if (payload.event === GTaskEventStatusEnums.Completed) { - task.taskDescs.forEach((t) => { + const unlisten = TaskService.onTaskGroupUpdate((payload: GroupEvent) => { + console.log(`Received task group update: ${payload.event}`); + if (payload.event == GroupEventPayloadEnums.Created) { + setNeedRefresh(true); + return; + } + setTasks((prevTasks) => { + const { name, version } = parseTaskGroup(payload.taskGroup); + let newTasks = prevTasks.map((group) => { + if (group.taskGroup === payload.taskGroup) { + switch (payload.event) { + case GroupEventPayloadEnums.Started: + group.status = RuntimeStateEnums.InProgress; + break; + + case GroupEventPayloadEnums.Completed: + toast({ + status: "success", + title: t(`Services.task.onTaskGroupUpdate.status.Completed`, { + param: t(`DownloadTasksPage.task.${name}`, { + param: version || "", + }), + }), + }); + group.status = RuntimeStateEnums.Completed; + group.taskDescs.forEach((t) => { if ( - t.status === TaskDescStatusEnums.Waiting || - t.status === TaskDescStatusEnums.InProgress + t.status === RuntimeStateEnums.Pending || + t.status === RuntimeStateEnums.InProgress ) { - t.status = TaskDescStatusEnums.Completed; + t.status = RuntimeStateEnums.Completed; t.current = t.total; - } else if (t.status === TaskDescStatusEnums.Failed) { - task.status = GTaskEventStatusEnums.Failed; - payload.event = GTaskEventStatusEnums.Failed; } }); - } - } - return task; - }); - - const { name, version } = parseTaskGroup(payload.taskGroup); - - toast({ - status: - payload.event === GTaskEventStatusEnums.Failed - ? "error" - : "success", - title: t( - `Services.task.onTaskGroupUpdate.status.${payload.event}`, - { - param: t(`DownloadTasksPage.task.${name}`, { - param: version || "", - }), - } - ), - }); - - if (payload.event === GTaskEventStatusEnums.Completed) { - switch (name) { - case "game-client": - getInstanceList(true); - break; - case "forge-libraries": - case "neoforge-libraries": - if (version) { - let instanceName = getInstanceList()?.find( - (i) => i.id === version - )?.name; - if (loadingToastRef.current) return newTasks; - loadingToastRef.current = toast({ - title: t( - "Services.instance.finishModLoaderInstall.loading", - { - instanceName, - } - ), - status: "loading", - }); - InstanceService.finishModLoaderInstall(version).then( - (response) => { - if (loadingToastRef.current) { - closeToast(loadingToastRef.current); - loadingToastRef.current = null; - } - if (response.status === "success") { - toast({ - title: response.message, - status: "success", - }); - } else { - toast({ - title: response.message, - description: response.details, - status: "error", - }); - } + switch (name) { + case "game-client": + getInstanceList(true); + break; + case "forge-libraries": + case "neoforge-libraries": + if (version) { + let instanceName = getInstanceList()?.find( + (i) => i.id === version + )?.name; + if (loadingToastRef.current) break; + loadingToastRef.current = toast({ + title: t( + "Services.instance.finishModLoaderInstall.loading", + { + instanceName, + } + ), + status: "loading", + }); + InstanceService.finishModLoaderInstall(version).then( + (response) => { + if (loadingToastRef.current) { + closeToast(loadingToastRef.current); + loadingToastRef.current = null; + } + if (response.status === "success") { + toast({ + title: response.message, + status: "success", + }); + } else { + toast({ + title: response.message, + description: response.details, + status: "error", + }); + } + } + ); } - ); + break; + case "mod": + case "mod-update": + emit( + "instance:refresh-resource-list", + OtherResourceType.Mod + ); + break; + case "resourcepack": + emit( + "instance:refresh-resource-list", + OtherResourceType.ResourcePack + ); + break; + case "shader": + emit( + "instance:refresh-resource-list", + OtherResourceType.ShaderPack + ); + break; + case "modpack": + if (group.taskDescs.length > 0) { + openSharedModal("import-modpack", { + path: group.taskDescs[0].dest, + }); + } + break; + default: + break; } break; - case "mod": - case "mod-update": - emit("instance:refresh-resource-list", OtherResourceType.Mod); - break; - case "resourcepack": - emit( - "instance:refresh-resource-list", - OtherResourceType.ResourcePack - ); + + case GroupEventPayloadEnums.Failed: + toast({ + status: "error", + title: t(`Services.task.onTaskGroupUpdate.status.Failed`, { + param: t(`DownloadTasksPage.task.${name}`, { + param: version || "", + }), + }), + }); + group.status = RuntimeStateEnums.Failed; break; - case "shader": - emit( - "instance:refresh-resource-list", - OtherResourceType.ShaderPack - ); + + case GroupEventPayloadEnums.Stopped: + group.status = RuntimeStateEnums.Stopped; break; - case "modpack": - let group = newTasks.find( - (t) => t.taskGroup === payload.taskGroup - ); - if (group && group.taskDescs.length > 0) { - openSharedModal("import-modpack", { - path: group.taskDescs[0].payload.dest, - }); - } + + case GroupEventPayloadEnums.Cancelled: + group.status = RuntimeStateEnums.Cancelled; break; + default: break; } + updateGroupDesc(group); } - - return newTasks; + return group; }); - } - ); + + return newTasks; + }); + }); return () => { unlisten(); }; - }, [closeToast, getInstanceList, t, toast, updateGroupInfo, openSharedModal]); + }, [closeToast, getInstanceList, t, toast, updateGroupDesc, openSharedModal]); useEffect(() => { if (!tasks || !tasks.length) setGeneralPercent(undefined); else { let filteredTasks = tasks.filter( - (t) => t.status === GTaskEventStatusEnums.Started + (t) => t.status === RuntimeStateEnums.InProgress ); if (filteredTasks.length === 0) setGeneralPercent(undefined); @@ -538,6 +551,7 @@ export const TaskContextProvider: React.FC<{ children: React.ReactNode }> = ({ handleCancelProgressiveTaskGroup, handleResumeProgressiveTaskGroup, handleStopProgressiveTaskGroup, + handleRetryProgressiveTaskGroup, }} > {children} diff --git a/src/models/task.ts b/src/models/task.ts index 7ad2b0fb5..ac83b343d 100644 --- a/src/models/task.ts +++ b/src/models/task.ts @@ -1,18 +1,18 @@ export enum TaskTypeEnums { - Download = "download", + Download = "Download", } export type TaskType = `${TaskTypeEnums}`; -export interface DownloadTaskParam { - taskType: TaskTypeEnums.Download; +export interface DownloadRuntimeTaskParam { + type: TaskTypeEnums.Download; src: string; dest: string; // destination path filename?: string; // destination filename sha1?: string; } -export type TaskParam = DownloadTaskParam; +export type RuntimeTaskParam = DownloadRuntimeTaskParam; export interface DownloadTaskPayload { taskType: TaskTypeEnums.Download; @@ -24,22 +24,75 @@ export interface DownloadTaskPayload { export type TaskPayload = DownloadTaskPayload; -export enum TaskDescStatusEnums { +export enum RuntimeStateEnums { Stopped = "Stopped", Cancelled = "Cancelled", Completed = "Completed", InProgress = "InProgress", Failed = "Failed", - Waiting = "Waiting", + Pending = "Pending", +} + +export interface StoppedRuntimeState { + type: RuntimeStateEnums.Stopped; + stoppedAt: number; +} + +export interface FailedRuntimeState { + type: RuntimeStateEnums.Failed; + reason: string; +} + +export interface InProgressRuntimeState { + type: RuntimeStateEnums.InProgress; +} + +export interface CompletedRuntimeState { + type: RuntimeStateEnums.Completed; + completedAt: number; +} + +export interface CancelledRuntimeState { + type: RuntimeStateEnums.Cancelled; +} + +export interface PendingRuntimeState { + type: RuntimeStateEnums.Pending; +} + +export type RuntimeState = + | StoppedRuntimeState + | FailedRuntimeState + | InProgressRuntimeState + | CompletedRuntimeState + | CancelledRuntimeState + | PendingRuntimeState; + +export interface RuntimeTaskDescSnapshot { + state: RuntimeState; + total: number; + current: number; + startAt: number; + createdAt: number; + filename: string; + dest: string; +} + +export interface RuntimeGroupDescSnapshot { + name: string; + state: RuntimeState; + taskDescMap: Record; } export interface TaskDesc { taskId: number; - taskGroup: string | null; - payload: TaskPayload; - current: number; + status: RuntimeStateEnums; total: number; - status?: TaskDescStatusEnums; + current: number; + startAt: number; + createdAt: number; + filename: string; + dest: string; progress?: number; reason?: string; estimatedTime?: Duration; // estimated time remaining in seconds @@ -49,15 +102,14 @@ export interface TaskDesc { export interface TaskGroupDesc { taskDescs: TaskDesc[]; taskGroup: string; - status: GTaskEventStatusEnums; - finishedCount?: number; - progress?: number; + status: RuntimeStateEnums; + finishedCount: number; + progress: number; reason?: string; - estimatedTime?: Duration; // estimated time remaining in seconds + estimatedTime?: Duration; } -export enum PTaskEventStatusEnums { - Created = "Created", +export enum TaskEventPayloadEnums { Started = "Started", InProgress = "InProgress", Completed = "Completed", @@ -71,55 +123,50 @@ export interface Duration { nanos: number; // nanoseconds } -export interface InProgressPTaskEventStatus { - status: PTaskEventStatusEnums.InProgress; +export interface InProgressTaskEventPayload { + status: TaskEventPayloadEnums.InProgress; percent: number; current: number; - estimatedTime: Duration; // estimated time remaining + estimatedTime?: Duration; // estimated time remaining speed: number; // speed in bytes per second } -export interface StartedPTaskEventStatus { - status: PTaskEventStatusEnums.Started; +export interface StartedTaskEventPayload { + status: TaskEventPayloadEnums.Started; total: number; // total size in bytes } -export interface CreatedPTaskEventStatus { - status: PTaskEventStatusEnums.Created; - desc: TaskDesc; // task description +export interface CompletedTaskEventPayload { + status: TaskEventPayloadEnums.Completed; } -export interface CompletedPTaskEventStatus { - status: PTaskEventStatusEnums.Completed; -} - -export interface FailedPTaskEventStatus { - status: PTaskEventStatusEnums.Failed; +export interface FailedTaskEventPayload { + status: TaskEventPayloadEnums.Failed; reason: string; // error message } -export interface StoppedPTaskEventStatus { - status: PTaskEventStatusEnums.Stopped; +export interface StoppedTaskEventPayload { + status: TaskEventPayloadEnums.Stopped; } -export interface CancelledPTaskEventStatus { - status: PTaskEventStatusEnums.Cancelled; +export interface CancelledTaskEventPayload { + status: TaskEventPayloadEnums.Cancelled; } -export interface PTaskEventPayload { +export interface TaskEvent { id: number; taskGroup: string; event: - | InProgressPTaskEventStatus - | StartedPTaskEventStatus - | CreatedPTaskEventStatus - | CompletedPTaskEventStatus - | FailedPTaskEventStatus - | StoppedPTaskEventStatus - | CancelledPTaskEventStatus; + | InProgressTaskEventPayload + | StartedTaskEventPayload + | CompletedTaskEventPayload + | FailedTaskEventPayload + | StoppedTaskEventPayload + | CancelledTaskEventPayload; } -export enum GTaskEventStatusEnums { +export enum GroupEventPayloadEnums { + Created = "Created", Started = "Started", Failed = "Failed", Completed = "Completed", @@ -127,7 +174,7 @@ export enum GTaskEventStatusEnums { Cancelled = "Cancelled", } -export interface GTaskEventPayload { +export interface GroupEvent { taskGroup: string; - event: GTaskEventStatusEnums; + event: GroupEventPayloadEnums; } diff --git a/src/pages/downloads.tsx b/src/pages/downloads.tsx index 1ae1467b0..97724290a 100644 --- a/src/pages/downloads.tsx +++ b/src/pages/downloads.tsx @@ -25,12 +25,7 @@ import { OptionItem, OptionItemGroup } from "@/components/common/option-item"; import { Section } from "@/components/common/section"; import { useLauncherConfig } from "@/contexts/config"; import { useTaskContext } from "@/contexts/task"; -import { - GTaskEventStatusEnums, - TaskDesc, - TaskDescStatusEnums, - TaskGroupDesc, -} from "@/models/task"; +import { RuntimeStateEnums, TaskDesc, TaskGroupDesc } from "@/models/task"; import { formatTimeInterval } from "@/utils/datetime"; import { formatByteSize } from "@/utils/string"; import { parseTaskGroup } from "@/utils/task"; @@ -43,10 +38,10 @@ export const DownloadTasksPage = () => { const { tasks, - handleScheduleProgressiveTaskGroup, handleCancelProgressiveTaskGroup, handleStopProgressiveTaskGroup, handleResumeProgressiveTaskGroup, + handleRetryProgressiveTaskGroup, } = useTaskContext(); const [taskGroupList, setTaskGroupList] = useState< @@ -127,44 +122,44 @@ export const DownloadTasksPage = () => { {group.finishedCount} / {group.taskDescs.length} - {group.status === GTaskEventStatusEnums.Started && + {group.status === RuntimeStateEnums.InProgress && group.estimatedTime && ( {formatTimeInterval(group.estimatedTime.secs)} )} - {group.status === GTaskEventStatusEnums.Stopped && ( + {group.status === RuntimeStateEnums.Stopped && ( {t("DownloadTasksPage.label.paused")} )} - {group.status === GTaskEventStatusEnums.Completed && ( + {group.status === RuntimeStateEnums.Completed && ( {t("DownloadTasksPage.label.completed")} )} - {(group.status === GTaskEventStatusEnums.Failed || + {(group.status === RuntimeStateEnums.Failed || group.reason) && ( {group.reason || t("DownloadTasksPage.label.error")} )} - {group.status === GTaskEventStatusEnums.Cancelled && ( + {group.status === RuntimeStateEnums.Cancelled && ( {t("DownloadTasksPage.label.cancelled")} )} - {(group.status === GTaskEventStatusEnums.Stopped || - group.status === GTaskEventStatusEnums.Started) && ( + {(group.status === RuntimeStateEnums.Stopped || + group.status === RuntimeStateEnums.InProgress) && ( { ) : ( @@ -185,7 +180,7 @@ export const DownloadTasksPage = () => { ml={1} variant="ghost" onClick={() => { - group.status === GTaskEventStatusEnums.Started + group.status === RuntimeStateEnums.InProgress ? handleStopProgressiveTaskGroup(group.taskGroup) : handleResumeProgressiveTaskGroup( group.taskGroup @@ -195,8 +190,8 @@ export const DownloadTasksPage = () => { )} - {(group.status === GTaskEventStatusEnums.Failed || - group.status === GTaskEventStatusEnums.Cancelled) && ( + {(group.status === RuntimeStateEnums.Failed || + group.status === RuntimeStateEnums.Cancelled) && ( { ml={1} variant="ghost" onClick={() => - handleScheduleProgressiveTaskGroup( - "retry", - group.taskDescs - .filter( - (t) => - t.status !== TaskDescStatusEnums.Completed - ) - .map((t) => t.payload) - ) + handleRetryProgressiveTaskGroup(group.taskGroup) } /> )} - {group.status !== GTaskEventStatusEnums.Cancelled && - group.status !== GTaskEventStatusEnums.Completed && ( + {group.status !== RuntimeStateEnums.Cancelled && + group.status !== RuntimeStateEnums.Completed && ( { - {group.status !== GTaskEventStatusEnums.Completed && ( + {group.status !== RuntimeStateEnums.Completed && ( { ? group.taskDescs.map((task) => ( { ) } > - {task.status !== TaskDescStatusEnums.Completed && - task.status !== TaskDescStatusEnums.Failed && ( + {task.status !== RuntimeStateEnums.Completed && + task.status !== RuntimeStateEnums.Failed && ( )} - {task.status === TaskDescStatusEnums.Failed && ( + {task.status === RuntimeStateEnums.Failed && ( {t("DownloadTasksPage.label.error")} )} - {task.status === TaskDescStatusEnums.Completed && ( + {task.status === RuntimeStateEnums.Completed && ( revealItemInDir(task.payload.dest)} + onClick={() => revealItemInDir(task.dest)} /> )} diff --git a/src/pages/settings/dev-test.tsx b/src/pages/settings/dev-test.tsx index acaa0eb32..9edadc91d 100644 --- a/src/pages/settings/dev-test.tsx +++ b/src/pages/settings/dev-test.tsx @@ -3,8 +3,12 @@ import { invoke } from "@tauri-apps/api/core"; import { useRouter } from "next/router"; import { useEffect, useState } from "react"; import SkinPreview from "@/components/skin-preview"; -import { DownloadTaskParam, TaskParam, TaskTypeEnums } from "@/models/task"; -import { TaskService } from "@/services/task"; +import { useTaskContext } from "@/contexts/task"; +import { + DownloadRuntimeTaskParam, + RuntimeTaskParam, + TaskTypeEnums, +} from "@/models/task"; import { isProd } from "@/utils/env"; import { createWindow } from "@/utils/window"; @@ -14,6 +18,7 @@ import { createWindow } from "@/utils/window"; // ============================================================ const DevTestPage = () => { + const { handleScheduleProgressiveTaskGroup } = useTaskContext(); const router = useRouter(); useEffect(() => { if (isProd) { @@ -61,14 +66,17 @@ const DevTestPage = () => {