From f491cc2998169facd00fb00c01dec0bf9271f6a4 Mon Sep 17 00:00:00 2001 From: Lifei Chen Date: Thu, 16 Dec 2021 17:07:29 +0800 Subject: [PATCH] feat: release for v0.1.15. (#9) Main feature includes: - Support agent smooth upgrade. - Remove dependency of sudo. - Use #[tokio::test] for some test. - Store ps1 with gbk code. --- CHANGELOG.md | 8 + Cargo.lock | 2 +- Cargo.toml | 2 +- Makefile | 2 +- install/install.bat | 1 - install/install.sh | 199 ++++--- install/self_update.bat | 1 - src/common/consts.rs | 6 +- src/daemonizer/windows.rs | 10 +- src/executor/powershell_command.rs | 12 +- src/executor/proc.rs | 831 ++++++++++++++--------------- src/executor/shell_command.rs | 232 +++----- src/http/store.rs | 2 +- src/http/thread.rs | 9 +- src/ontime/self_update.rs | 78 ++- src/ontime/thread.rs | 15 +- src/types/task.rs | 21 +- src/ws/thread.rs | 26 +- tests/support/response.rs | 2 + 19 files changed, 713 insertions(+), 746 deletions(-) mode change 100644 => 100755 src/executor/shell_command.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f5df41f..d79df7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,14 @@ # Changelog All notable changes to this project will be documented in this file. +## [0.1.15] - 2021-12-05 + +### Changed + +- Support agent smooth upgrade. +- Remove sudo dependency. +- Fix powershell script gbk error. + ## [0.1.14] - 2021-12-02 ### Changed diff --git a/Cargo.lock b/Cargo.lock index 4a0e311..10a956b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2427,7 +2427,7 @@ dependencies = [ [[package]] name = "tat_agent" -version = "0.1.14" +version = "0.1.15" dependencies = [ "async-attributes", "async-std", diff --git a/Cargo.toml b/Cargo.toml index cb3dc01..b7395b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tat_agent" -version = "0.1.14" +version = "0.1.15" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/Makefile b/Makefile index e2b9ced..23818f8 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ AGENT_IMG=$(DOCKER_REGISTRY)/tat-develop/tat_agent # test all case lib-test: - cargo test --lib -- --nocapture --skip ontime --skip executor::proc::tests::test_shell_cmd_timeout + cargo test --lib -- --nocapture --skip ontime integration-test: cargo test --test http_test diff --git a/install/install.bat b/install/install.bat index caf4d17..472ff92 100644 --- a/install/install.bat +++ b/install/install.bat @@ -9,7 +9,6 @@ if not exist "C:\Program Files\qcloud\tat_agent\workdir" ( md "C:\Program Files\qcloud\tat_agent\workdir" ) - sc query tatsvc | find "STATE" >nul && sc stop tatsvc >nul ||^ sc create tatsvc binPath= "C:\Program Files\qcloud\tat_agent\tat_agent.exe" start= auto diff --git a/install/install.sh b/install/install.sh index 31a0eea..7cba3de 100755 --- a/install/install.sh +++ b/install/install.sh @@ -1,11 +1,6 @@ #!/bin/bash cd `dirname $0` -need_restart=true -if [ "only_update" = ""$1 ]; then - need_restart=false -fi - PID_FILE="/var/run/tat_agent.pid" # install the agent binary @@ -14,38 +9,6 @@ PATH_DIR="/usr/sbin/" TAT_AGENT="tat_agent" TAT_AGENT32="tat_agent32" -# if arch is 32bit and 32bit bin exists, rename `tat_agent32` to `tat_agent` -machine=$(uname -m) -if [ "$machine" != "x86_64" ] && [ -f "$TAT_AGENT32" ]; then - mv ${TAT_AGENT} -f ${TAT_AGENT}64 - mv ${TAT_AGENT32} -f ${TAT_AGENT} -fi - -# check if agent runnable -chmod +x ${TAT_AGENT} -if ! ./${TAT_AGENT} -V; then - echo "tat_agent not runnable, exit." - exit 1 -fi - -mkdir -p ${SERVICE_DIR} -if [ $? -ne 0 ]; then - # handle special case for CoreOS whose /usr is Read-only - grep -q CoreOS /etc/os-release - if [ $? -eq 0 ]; then - SERVICE_DIR="/var/lib/qcloud/tat_agent/" - mkdir -p ${SERVICE_DIR} - PATH_DIR="/opt/bin/" - sed -i 's/\/usr\/local\/qcloud/\/var\/lib\/qcloud/g' tat_agent.service tat_agent_service.conf tat_agent_service uninstall.sh - sed -i 's/\/usr\/sbin/\/opt\/bin/g' uninstall.sh - else - echo 'Install fail, has no permission, may not root.' - exit 1 - fi -fi -cp -f ${TAT_AGENT} ${SERVICE_DIR} -ln -sf ${SERVICE_DIR}${TAT_AGENT} ${PATH_DIR}${TAT_AGENT} - has_systemd() { [[ `systemctl` =~ -\.mount ]] > /dev/null 2>&1 && return 0 if systemctl 2>/dev/null | grep -e "-\.mount" > /dev/null 2>&1; then @@ -74,46 +37,122 @@ has_upstart() { return 1 } -if has_systemd; then - echo "use systemd to manage service" - SYSTEMD_DIR="/etc/systemd/system/" - cp -f tat_agent.service ${SYSTEMD_DIR} - systemctl daemon-reload - systemctl enable tat_agent.service - if test "${need_restart}" = true; then - systemctl restart tat_agent.service - fi -elif has_upstart; then - echo "use upstart(initctl) to manage service" - cp -f tat_agent_service.conf /etc/init/ - if test "${need_restart}" = true; then - initctl stop tat_agent_service - initctl start tat_agent_service - fi -elif has_sysvinit; then - cp -f tat_agent_service /etc/init.d/ - chmod 755 /etc/init.d/tat_agent_service - /etc/init.d/tat_agent_service restart - which chkconfig > /dev/null 2>&1 - if [ $? -eq 0 ]; then - echo "use chkconfig to manage service" - chkconfig --add tat_agent_service - chkconfig tat_agent_service on - else - which update-rc.d > /dev/null 2>&1 - if [ $? -eq 0 ]; then - echo "use update-rc.d to manage service" - update-rc.d tat_agent_service defaults - else - echo "no proper daemon manager found, tat_agent can not auto start" - fi - fi -else - echo "no proper daemon manager found, tat_agent can not auto start" - kill `cat ${PID_FILE}` > /dev/null 2>&1 - sleep 1 - rm -f ${PID_FILE} - cd ${SERVICE_DIR} - ./${TAT_AGENT} - echo "tat_agent started" -fi +install() { + need_restart=$1 + + # if arch is 32bit and 32bit bin exists, rename `tat_agent32` to `tat_agent` + machine=$(uname -m) + if [ "$machine" != "x86_64" ] && [ -f "$TAT_AGENT32" ]; then + mv ${TAT_AGENT} -f ${TAT_AGENT}64 + mv ${TAT_AGENT32} -f ${TAT_AGENT} + fi + + # check if agent runnable + chmod +x ${TAT_AGENT} + if ! ./${TAT_AGENT} -V; then + echo "tat_agent not runnable, exit." + exit 1 + fi + + mkdir -p ${SERVICE_DIR} + if [ $? -ne 0 ]; then + # handle special case for CoreOS whose /usr is Read-only + grep -q CoreOS /etc/os-release + if [ $? -eq 0 ]; then + SERVICE_DIR="/var/lib/qcloud/tat_agent/" + mkdir -p ${SERVICE_DIR} + PATH_DIR="/opt/bin/" + sed -i 's/\/usr\/local\/qcloud/\/var\/lib\/qcloud/g' tat_agent.service tat_agent_service.conf tat_agent_service uninstall.sh + sed -i 's/\/usr\/sbin/\/opt\/bin/g' uninstall.sh + else + echo 'Install fail, has no permission, may not root.' + exit 1 + fi + fi + cp -f ${TAT_AGENT} ${SERVICE_DIR} + ln -sf ${SERVICE_DIR}${TAT_AGENT} ${PATH_DIR}${TAT_AGENT} + + if has_systemd; then + echo "use systemd to manage service" + SYSTEMD_DIR="/etc/systemd/system/" + cp -f tat_agent.service ${SYSTEMD_DIR} + systemctl daemon-reload + systemctl enable tat_agent.service + if test "${need_restart}" = true; then + systemctl restart tat_agent.service + fi + elif has_upstart; then + echo "use upstart(initctl) to manage service" + cp -f tat_agent_service.conf /etc/init/ + if test "${need_restart}" = true; then + initctl stop tat_agent_service + initctl start tat_agent_service + fi + elif has_sysvinit; then + cp -f tat_agent_service /etc/init.d/ + chmod 755 /etc/init.d/tat_agent_service + # TODO: uncomment following code after 0.1.14 is released. + # if test "${need_restart}" = true; then + /etc/init.d/tat_agent_service restart + # fi + which chkconfig > /dev/null 2>&1 + if [ $? -eq 0 ]; then + echo "use chkconfig to manage service" + chkconfig --add tat_agent_service + chkconfig tat_agent_service on + else + which update-rc.d > /dev/null 2>&1 + if [ $? -eq 0 ]; then + echo "use update-rc.d to manage service" + update-rc.d tat_agent_service defaults + else + echo "no proper daemon manager found, tat_agent can not auto start" + fi + fi + else + # TODO: uncomment following code after 0.1.14 is released. + # if test "${need_restart}" = true; then + echo "no proper daemon manager found, tat_agent can not auto start" + PID=$(cat ${PID_FILE}) + kill ${PID} > /dev/null 2>&1 + sleep 0.1 || sleep 1 + rm -f ${PID_FILE} + cd ${SERVICE_DIR} + ./${TAT_AGENT} + echo "tat_agent started" + # fi + fi +} + +restart() { + if has_systemd; then + echo "use systemd to manage service" + systemctl restart tat_agent.service + elif has_upstart; then + echo "use upstart(initctl) to manage service" + initctl stop tat_agent_service + initctl start tat_agent_service + elif has_sysvinit; then + /etc/init.d/tat_agent_service restart + else + PID=$(cat ${PID_FILE}) + kill ${PID} > /dev/null 2>&1 + sleep 0.1 || sleep 1 + rm -f ${PID_FILE} + cd ${SERVICE_DIR} + ./${TAT_AGENT} + echo "tat_agent started" + fi +} + +case $1 in + only_update) + install false + ;; + restart) + restart + ;; + *) + install true + ;; +esac \ No newline at end of file diff --git a/install/self_update.bat b/install/self_update.bat index 3d44f38..888d03e 100644 --- a/install/self_update.bat +++ b/install/self_update.bat @@ -5,4 +5,3 @@ set "temp=%temp:.=%" set newname=temp_%temp%.exe rename "C:\Program Files\qcloud\tat_agent\tat_agent.exe" %newname% copy /Y tat_agent.exe "C:\Program Files\qcloud\tat_agent\" - diff --git a/src/common/consts.rs b/src/common/consts.rs index 20b5d27..c5b29ba 100644 --- a/src/common/consts.rs +++ b/src/common/consts.rs @@ -24,12 +24,8 @@ cfg_if::cfg_if! { pub const TASK_LOG_PATH: &str = "/tmp/tat_agent/logs/"; pub const SELF_UPDATE_PATH: &str = "/tmp/tat_agent/self_update/"; pub const SELF_UPDATE_SCRIPT: &str = "self_update.sh"; + pub const INSTALL_SCRIPT: &str = "install.sh"; pub const AGENT_DEFAULT_WORK_DIRECTORY: &str = "/root"; - - // pre_exec fn name for cmd - pub const OWN_PROCESS_GROUP: &str = "OWN_PROCESS_GROUP"; - pub const DUP2_1_2: &str = "DUP2_1_2"; - pub const FILE_EXECUTE_PERMISSION_MODE: u32 = 0o755; pub const PIPE_BUF_DEFAULT_SIZE: usize = 64 * 4096; } else if #[cfg(windows)] { diff --git a/src/daemonizer/windows.rs b/src/daemonizer/windows.rs index 9c34a37..2083376 100644 --- a/src/daemonizer/windows.rs +++ b/src/daemonizer/windows.rs @@ -111,6 +111,14 @@ fn clean_update_files() { }) } +fn set_ps1_policy() { + let mut cmd = std::process::Command::new("PowerShell.exe"); + cmd.args(&["set-ExecutionPolicy", "RemoteSigned"]); + let mut child = cmd.spawn().unwrap(); + child.wait().map_err(|_| error!("set_ps1_policy fail")).ok(); +} + + fn set_work_dir() { let exe_path = env::current_exe().unwrap(); let work_dir = exe_path.parent().unwrap(); @@ -150,7 +158,7 @@ where pub fn daemonize(entry: fn()) { clean_update_files(); set_work_dir(); - + set_ps1_policy(); if already_start() { std::process::exit(183); } diff --git a/src/executor/powershell_command.rs b/src/executor/powershell_command.rs index 4a42399..71bfd8c 100644 --- a/src/executor/powershell_command.rs +++ b/src/executor/powershell_command.rs @@ -64,13 +64,6 @@ impl PowerShellCommand { } } - async fn set_ps1_policy(&self) { - let mut cmd = Command::new("PowerShell.exe"); - cmd.args(&["set-ExecutionPolicy", "RemoteSigned"]); - let child = cmd.spawn().unwrap(); - child.await.map_err(|_| error!("set_ps1_policy fail")).ok(); - } - fn work_dir_check(&self) -> Result<(), String> { if !wow64_disable_exc(|| Path::new(self.base.work_dir.as_str()).exists()) { let ret = format!( @@ -115,6 +108,7 @@ impl MyCommand for PowerShellCommand { 3. support set process group. */ async fn run(&mut self) -> Result<(), String> { + info!("=>PowerShellCommand::run()"); // store path check self.store_path_check()?; @@ -123,9 +117,6 @@ impl MyCommand for PowerShellCommand { let log_file = self.open_log_file()?; - // set policy - self.set_ps1_policy().await; - // create pipe let (our_pipe, their_pipe) = anon_pipe(true)?; @@ -141,6 +132,7 @@ impl MyCommand for PowerShellCommand { *self.base.pid.lock().unwrap() = Some(child.id()); let base = self.base.clone(); + info!("=>PowerShellCommand::tokio::spawn"); // async read output. tokio::spawn(async move { base.add_timeout_timer(); diff --git a/src/executor/proc.rs b/src/executor/proc.rs index 7b6abd1..210c445 100644 --- a/src/executor/proc.rs +++ b/src/executor/proc.rs @@ -678,91 +678,81 @@ mod tests { } } - #[test] - fn test_run_then_sleep() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - // it doesn't matter even if ./a.sh not exist - let log_path = format!("./{}.log", gen_rand_str()); - File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); - let ret = new( - CMD_PATH, - &username(), - CMD_TYPE, - "./", - 1024, - 1024, - log_path.as_str(), - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - assert!(ret.is_ok()); - info!("cmd running, pid:{}", cmd.pid()); - tokio::time::delay_for(Duration::from_secs(4)).await; - // now it's NOT a defunct process, cmd will be auto-waited - assert!(!is_process_exist(cmd.pid())); - //thread::sleep(Duration::new(10, 0)); - }); + #[tokio::test] + async fn test_run_then_sleep() { + init_log(); + // it doesn't matter even if ./a.sh not exist + let log_path = format!("./{}.log", gen_rand_str()); + File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); + let ret = new( + CMD_PATH, + &username(), + CMD_TYPE, + "./", + 1024, + 1024, + log_path.as_str(), + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + assert!(ret.is_ok()); + info!("cmd running, pid:{}", cmd.pid()); + tokio::time::delay_for(Duration::from_secs(4)).await; + // now it's NOT a defunct process, cmd will be auto-waited + assert!(!is_process_exist(cmd.pid()).await); + //thread::sleep(Duration::new(10, 0)); } - #[test] - fn test_run_start_fail_working_directory() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - let ret = new( - CMD_PATH, - &username(), - CMD_TYPE, - "./dir_not_exist", - 1024, - 1024, - "./fake_path", - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - info!("cmd run ret:[{}]", ret.unwrap_err()); - assert_eq!(cmd.pid(), 0); - assert_eq!(cmd.finish_result(), FINISH_RESULT_START_FAILED); - assert_eq!(cmd.exit_code(), 0); - assert!(cmd.err_info().starts_with("DirectoryNotExists")); - }); + #[tokio::test] + async fn test_run_start_fail_working_directory() { + init_log(); + let ret = new( + CMD_PATH, + &username(), + CMD_TYPE, + "./dir_not_exist", + 1024, + 1024, + "./fake_path", + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + info!("cmd run ret:[{}]", ret.unwrap_err()); + assert_eq!(cmd.pid(), 0); + assert_eq!(cmd.finish_result(), FINISH_RESULT_START_FAILED); + assert_eq!(cmd.exit_code(), 0); + assert!(cmd.err_info().starts_with("DirectoryNotExists")); } #[cfg(unix)] - #[test] - fn test_run_start_fail_user_not_exists() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - - let ret = new( - CMD_PATH, - "hacker-neo", - CMD_TYPE, - "./", - 1024, - 1024, - "./fake_path", - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - info!("cmd run ret:[{}]", ret.unwrap_err()); - assert_eq!(cmd.pid(), 0); - assert_eq!(cmd.finish_result(), FINISH_RESULT_START_FAILED); - assert_eq!(cmd.exit_code(), 0); - assert!(cmd.err_info().starts_with("UserNotExists")); - }); + #[tokio::test] + async fn test_run_start_fail_user_not_exists() { + init_log(); + let ret = new( + CMD_PATH, + "hacker-neo", + CMD_TYPE, + "./", + 1024, + 1024, + "./fake_path", + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + info!("cmd run ret:[{}]", ret.unwrap_err()); + assert_eq!(cmd.pid(), 0); + assert_eq!(cmd.finish_result(), FINISH_RESULT_START_FAILED); + assert_eq!(cmd.exit_code(), 0); + assert!(cmd.err_info().starts_with("UserNotExists")); } fn gen_rand_str() -> String { @@ -787,9 +777,9 @@ mod tests { } #[cfg(unix)] - fn is_process_exist(pid: u32) -> bool { + async fn is_process_exist(pid: u32) -> bool { // maybe need a time to clear the dir - thread::sleep(Duration::from_millis(2000)); + tokio::time::delay_for(Duration::from_millis(2000)).await; let path = format!("/proc/{}", pid); let ret = read_dir(path); let exist = ret.is_ok(); @@ -798,7 +788,7 @@ mod tests { } #[cfg(windows)] - fn is_process_exist(pid: u32) -> bool { + async fn is_process_exist(pid: u32) -> bool { let pid_str = format!("PID eq {}", pid); let output = std::process::Command::new("TASKLIST") .args(&["/FI", pid_str.as_str()]) @@ -808,400 +798,365 @@ mod tests { } #[cfg(unix)] - #[test] - fn test_pid_exist() { - let ret = is_process_exist(1); + #[tokio::test] + async fn test_pid_exist() { + let ret = is_process_exist(1).await; assert!(ret); } #[cfg(unix)] - #[test] - fn test_pid_not_exist() { - let ret = is_process_exist(0); + #[tokio::test] + async fn test_pid_not_exist() { + let ret = is_process_exist(0).await; assert!(!ret); } - #[test] - fn test_cancel() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - - cfg_if::cfg_if! { - if #[cfg(unix)] { - let filename = format!("./.{}.sh", gen_rand_str()); - create_file("sleep 15", filename.as_str()); - } else if #[cfg(windows)] { - let filename = format!("./{}.ps1", gen_rand_str()); - create_file("Start-Sleep -s 1500", filename.as_str()); - } + #[tokio::test] + async fn test_cancel() { + init_log(); + cfg_if::cfg_if! { + if #[cfg(unix)] { + let filename = format!("./.{}.sh", gen_rand_str()); + create_file("sleep 15", filename.as_str()); + } else if #[cfg(windows)] { + let filename = format!("./{}.ps1", gen_rand_str()); + create_file("Start-Sleep -s 1500", filename.as_str()); } + } + let log_path = format!("./{}.log", gen_rand_str()); + File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); + let ret = new( + filename.as_str(), + &username(), + CMD_TYPE, + "./", + 1024, + 1024, + log_path.as_str(), + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + assert!(ret.is_ok()); + info!("{} running, pid:{}", filename, cmd.pid()); + // now it's a still running + tokio::time::delay_for(Duration::new(10, 0)).await; + assert_eq!(cmd.is_started(), true); + assert!(is_process_exist(cmd.pid()).await); - let log_path = format!("./{}.log", gen_rand_str()); - File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); - let ret = new( - filename.as_str(), - &username(), - CMD_TYPE, - "./", - 1024, - 1024, - log_path.as_str(), - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - assert!(ret.is_ok()); - info!("{} running, pid:{}", filename, cmd.pid()); - // now it's a still running - thread::sleep(Duration::new(10, 0)); - assert_eq!(cmd.is_started(), true); - assert!(is_process_exist(cmd.pid())); - - let ret = cmd.cancel(); - assert!(ret.is_ok()); - thread::sleep(Duration::new(1, 0)); - assert!(!is_process_exist(cmd.pid())); - // cmd.cancel() called twice is OK and safe - let ret = cmd.cancel(); - assert!(ret.is_ok()); - // Now it's killed & waited, check it's NOT a defunct. - // Even after killed, call cmd.pid() is OK - info!("{} killed, pid:{}", filename, cmd.pid()); - info!("cmd:{:?}", cmd); - thread::sleep(Duration::new(5, 0)); - - fs::remove_file(filename.as_str()).unwrap(); - }); + let ret = cmd.cancel(); + assert!(ret.is_ok()); + tokio::time::delay_for(Duration::new(1, 0)).await; + assert!(!is_process_exist(cmd.pid()).await); + // cmd.cancel() called twice is OK and safe + let ret = cmd.cancel(); + assert!(ret.is_ok()); + // Now it's killed & waited, check it's NOT a defunct. + // Even after killed, call cmd.pid() is OK + info!("{} killed, pid:{}", filename, cmd.pid()); + info!("cmd:{:?}", cmd); + tokio::time::delay_for(Duration::new(5, 0)).await; + fs::remove_file(filename.as_str()).unwrap(); } - #[test] - fn test_output() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - - cfg_if::cfg_if! { - if #[cfg(unix)] { - let filename = format!("./.{}.sh", gen_rand_str()); - create_file("yes | head -10 && sleep 3 && yes | head -5", filename.as_str()); - } else if #[cfg(windows)] { - let filename = format!("./{}.ps1", gen_rand_str()); - create_file( - "foreach ($i in 1..10) { Write-Host '00' -NoNewLine };\ - Start-Sleep -s 3; \ - foreach ($i in 1..5) { Write-Host '11' -NoNewLine };", - filename.as_str(), - ); - } + #[tokio::test] + async fn test_output() { + init_log(); + cfg_if::cfg_if! { + if #[cfg(unix)] { + let filename = format!("./.{}.sh", gen_rand_str()); + create_file("yes | head -10 && sleep 3 && yes | head -5", filename.as_str()); + } else if #[cfg(windows)] { + let filename = format!("./{}.ps1", gen_rand_str()); + create_file( + "foreach ($i in 1..10) { Write-Host '00' -NoNewLine };\ + Start-Sleep -s 3; \ + foreach ($i in 1..5) { Write-Host '11' -NoNewLine };", + filename.as_str(), + ); } + } + let log_path = format!("./{}.log", gen_rand_str()); + File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); + let ret = new( + filename.as_str(), + &username(), + CMD_TYPE, + "./", + 1024, + 18, + log_path.as_str(), + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + assert!(ret.is_ok()); + info!("{} running, pid:{}", filename, cmd.pid()); + let mut cur_dropped = 0 as u64; + // usage of read output + loop { + tokio::time::delay_for(Duration::from_secs(1)).await; + let len = cmd.cur_output_len(); + // is_finished() MUST be called after cur_output_len() + let finished = cmd.is_finished(); + if 0 != len && 0 == cur_dropped { + let (out, idx, dropped) = cmd.next_output(); + info!( + "ready to report output:{:?}, output_debug:{}, idx:{}, dropped:{}", + out, + String::from_utf8_lossy(&out[..]), + idx, + dropped + ); + assert_eq!(idx, 0); + assert_eq!(dropped, 2); + + // Do output report task here + // do_report(out, idx, dropped); + if dropped > 0 { + // max report exceeds, get dropped and idx during sleep + let (out, idx, dropped_new) = cmd.next_output(); + info!("during sleep: idx: {}, drop {}, ", idx, dropped); + assert_eq!(idx, 1); + assert_eq!(dropped_new, dropped); + assert_eq!(0, out.len()); + info!("dropped, not report output any more"); - let log_path = format!("./{}.log", gen_rand_str()); - File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); - let ret = new( - filename.as_str(), - &username(), - CMD_TYPE, - "./", - 1024, - 18, - log_path.as_str(), - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - assert!(ret.is_ok()); - info!("{} running, pid:{}", filename, cmd.pid()); - let mut cur_dropped = 0 as u64; - - // usage of read output - loop { - tokio::time::delay_for(Duration::from_secs(1)).await; - let len = cmd.cur_output_len(); - // is_finished() MUST be called after cur_output_len() - let finished = cmd.is_finished(); - if 0 != len && 0 == cur_dropped { - let (out, idx, dropped) = cmd.next_output(); - info!( - "ready to report output:{:?}, output_debug:{}, idx:{}, dropped:{}", - out, - String::from_utf8_lossy(&out[..]), - idx, - dropped - ); - assert_eq!(idx, 0); - assert_eq!(dropped, 2); - - // Do output report task here - // do_report(out, idx, dropped); - if dropped > 0 { - // max report exceeds, get dropped and idx during sleep - let (out, idx, dropped_new) = cmd.next_output(); - info!("during sleep: idx: {}, drop {}, ", idx, dropped); - assert_eq!(idx, 1); - assert_eq!(dropped_new, dropped); - assert_eq!(0, out.len()); - info!("dropped, not report output any more"); - - cur_dropped = dropped_new; - } + cur_dropped = dropped_new; } + } - if finished { - let (out, idx, dropped) = cmd.next_output(); - info!("after sleep: idx: {}, drop {}", idx, dropped); - assert_eq!(idx, 2); - assert_eq!(dropped, 12); - assert_eq!(0, out.len()); - // do_report(out, idx, dropped); - info!("finished, report final dropped bytes of output."); - break; - } + if finished { + let (out, idx, dropped) = cmd.next_output(); + info!("after sleep: idx: {}, drop {}", idx, dropped); + assert_eq!(idx, 2); + assert_eq!(dropped, 12); + assert_eq!(0, out.len()); + // do_report(out, idx, dropped); + info!("finished, report final dropped bytes of output."); + break; } - // will see the output bytes in cmd.output - info!("cmd:{:?}", cmd); - thread::sleep(Duration::new(1, 0)); - assert!(!is_process_exist(cmd.pid())); + } + // will see the output bytes in cmd.output + info!("cmd:{:?}", cmd); + tokio::time::delay_for(Duration::new(1, 0)).await; + assert!(!is_process_exist(cmd.pid()).await); - fs::remove_file(filename.as_str()).unwrap(); - }); + fs::remove_file(filename.as_str()).unwrap(); } - #[test] - fn test_base64() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - - cfg_if::cfg_if! { - if #[cfg(unix)] { - let filename = format!("./.{}.sh", gen_rand_str()); - create_file("echo -n 'hello world'", filename.as_str()); - } else if #[cfg(windows)] { - let filename = format!("./{}.ps1", gen_rand_str()); - create_file( - "Write-Host 'hello world' -NoNewLine", - filename.as_str(), - ); - } - } - let log_path = format!("./{}.log", gen_rand_str()); - File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); - let ret = new( - filename.as_str(), - &username(), - CMD_TYPE, - "./", - 10, - 1024, - log_path.as_str(), - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - assert!(ret.is_ok()); - info!("{} running, pid:{}", filename, cmd.pid()); - - while !cmd.is_finished() { - thread::sleep(Duration::new(1, 0)); + #[tokio::test] + async fn test_base64() { + init_log(); + cfg_if::cfg_if! { + if #[cfg(unix)] { + let filename = format!("./.{}.sh", gen_rand_str()); + create_file("echo -n 'hello world'", filename.as_str()); + } else if #[cfg(windows)] { + let filename = format!("./{}.ps1", gen_rand_str()); + create_file( + "Write-Host 'hello world' -NoNewLine", + filename.as_str(), + ); } + } + let log_path = format!("./{}.log", gen_rand_str()); + File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); + let ret = new( + filename.as_str(), + &username(), + CMD_TYPE, + "./", + 10, + 1024, + log_path.as_str(), + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + assert!(ret.is_ok()); + info!("{} running, pid:{}", filename, cmd.pid()); - let (out, idx, dropped) = cmd.next_output(); - let out = base64::encode(out); + while !cmd.is_finished() { + tokio::time::delay_for(Duration::new(1, 0)).await; + } - assert_eq!(dropped, 0); - assert_eq!(0, idx); - assert_eq!(out, "aGVsbG8gd29ybGQ="); - info!("out:{}", out); - info!("cmd:{:?}", cmd); - thread::sleep(Duration::new(1, 0)); - assert!(!is_process_exist(cmd.pid())); + let (out, idx, dropped) = cmd.next_output(); + let out = base64::encode(out); - fs::remove_file(filename.as_str()).unwrap(); - }); + assert_eq!(dropped, 0); + assert_eq!(0, idx); + assert_eq!(out, "aGVsbG8gd29ybGQ="); + info!("out:{}", out); + info!("cmd:{:?}", cmd); + tokio::time::delay_for(Duration::new(1, 0)).await; + assert!(!is_process_exist(cmd.pid()).await); + fs::remove_file(filename.as_str()).unwrap(); } - #[test] - // NOTICE: This testcase has use singleton of timer, - // All testcase share the same one timer, so: - // This testcase can NOT run together with test_timer_in_one_case - fn test_shell_cmd_timeout() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - - cfg_if::cfg_if! { - if #[cfg(unix)] { - let filename = format!("./.{}.sh", gen_rand_str()); - create_file("pwd && sleep 10240", filename.as_str()); - } else if #[cfg(windows)] { - let filename = format!("./{}.ps1", gen_rand_str()); - create_file( - "Start-Sleep -s 10240", - filename.as_str(), - ); - } + #[tokio::test] + async fn test_shell_cmd_timeout() { + init_log(); + cfg_if::cfg_if! { + if #[cfg(unix)] { + let filename = format!("./.{}.sh", gen_rand_str()); + create_file("pwd && sleep 10240", filename.as_str()); + } else if #[cfg(windows)] { + let filename = format!("./{}.ps1", gen_rand_str()); + create_file( + "Start-Sleep -s 10240", + filename.as_str(), + ); } + } - let start_time = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - info!("start_time:{}", start_time); - let log_path = format!("./{}.log", gen_rand_str()); - File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); - let ret = new( - filename.as_str(), - &username(), - CMD_TYPE, - "./", - 2, - 1024, - log_path.as_str(), - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - assert!(ret.is_ok()); - assert!(is_process_exist(cmd.pid())); - let instant = Instant::now(); - info!("{} running, pid:{}", filename, cmd.pid()); - thread::sleep(Duration::new(1, 0)); - - let mut cnt = 0; - loop { - { - let timer = Timer::get_instance(); - let mut timer = timer.lock().unwrap_or_exit(""); - info!("timer:{:?}", timer); - let tasks = timer.tasks_to_schedule(); - cnt += tasks.len(); - for task in tasks { - task.run_task(); - } - } - info!("total {} tasks run", cnt); - thread::sleep(Duration::new(0, 500_000_000)); - let finished = cmd.is_finished(); - if finished { - break; + let start_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + info!("script {} start_time:{}",filename, start_time); + let log_path = format!("./{}.log", gen_rand_str()); + File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); + let ret = new( + filename.as_str(), + &username(), + CMD_TYPE, + "./", + 2, + 1024, + log_path.as_str(), + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + assert!(ret.is_ok()); + let instant = Instant::now(); + info!("{} running, pid:{}", filename, cmd.pid()); + let mut cnt = 0; + loop { + { + let timer = Timer::get_instance(); + let mut timer = timer.lock().unwrap_or_exit(""); + info!("timer:{:?}", timer); + let tasks = timer.tasks_to_schedule(); + cnt += tasks.len(); + for task in tasks { + task.run_task(); } } - info!("cmd:{:?}", cmd); - info!("finish result:{}", cmd.finish_result()); - assert!(cmd.is_timeout()); - assert!(instant.elapsed() <= Duration::from_secs(5)); - assert!(0 < cmd.finish_time()); - assert!(cmd.finish_time() < start_time + 5); - assert!(!is_process_exist(cmd.pid())); - fs::remove_file(filename.as_str()).unwrap(); - }); + info!("total {} tasks run", cnt); + tokio::time::delay_for(Duration::new(0, 500_000_000)).await; + let finished = cmd.is_finished(); + if finished { + break; + } + } + info!("cmd:{:?}", cmd); + info!("finish result:{}", cmd.finish_result()); + assert!(cmd.is_timeout()); + assert!(instant.elapsed() <= Duration::from_secs(5)); + assert!(0 < cmd.finish_time()); + assert!(cmd.finish_time() < start_time + 5); + assert!(!is_process_exist(cmd.pid()).await); + fs::remove_file(filename.as_str()).unwrap(); } - #[test] + #[tokio::test] #[cfg(unix)] - fn test_daemon() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - let filename = format!("./.{}.sh", gen_rand_str()); - create_file("echo 'hello world'\nsleep 10 &\ndate", filename.as_str()); - - let log_path = format!("./{}.log", gen_rand_str()); - File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); - let ret = new( - filename.as_str(), - &username(), - CMD_TYPE, - "./", - 6, - 1024, - log_path.as_str(), - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - assert!(ret.is_ok()); - - loop { - { - let timer = Timer::get_instance(); - let mut timer = timer.lock().unwrap_or_exit(""); - info!("timer:{:?}", timer); - let tasks = timer.tasks_to_schedule(); - for task in tasks { - task.run_task(); - } - } - thread::sleep(Duration::from_secs(1)); - let finished = cmd.is_finished(); - if finished { - break; + async fn test_daemon() { + init_log(); + let filename = format!("./.{}.sh", gen_rand_str()); + create_file("echo 'hello world'\nsleep 10 &\ndate", filename.as_str()); + let log_path = format!("./{}.log", gen_rand_str()); + File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); + let ret = new( + filename.as_str(), + &username(), + CMD_TYPE, + "./", + 6, + 1024, + log_path.as_str(), + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + assert!(ret.is_ok()); + loop { + { + let timer = Timer::get_instance(); + let mut timer = timer.lock().unwrap_or_exit(""); + info!("timer:{:?}", timer); + let tasks = timer.tasks_to_schedule(); + for task in tasks { + task.run_task(); } } - assert_eq!(cmd.is_timeout(), false); - fs::remove_file(filename.as_str()).unwrap(); - }); + tokio::time::delay_for(Duration::from_millis(100)).await; + let finished = cmd.is_finished(); + if finished { + break; + } + } + assert_eq!(cmd.is_timeout(), false); + fs::remove_file(filename.as_str()).unwrap(); } - #[test] + #[tokio::test] #[cfg(unix)] - fn test_daemon_output() { - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - init_log(); - let filename = format!("./.{}.sh", gen_rand_str()); - create_file( - "yes | head -1024 \nsleep 1200 &\n yes | head -1025", - filename.as_str(), - ); + async fn test_daemon_output() { + init_log(); + let filename = format!("./.{}.sh", gen_rand_str()); + create_file( + "yes | head -1024 \nsleep 1200 &\n yes | head -1025", + filename.as_str(), + ); - let log_path = format!("./{}.log", gen_rand_str()); - File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); - let ret = new( - filename.as_str(), - &username(), - CMD_TYPE, - "./", - 1200, - 10240, - log_path.as_str(), - "", - "", - "", - ); - let mut cmd = ret.unwrap(); - let ret = cmd.run().await; - assert!(ret.is_ok()); - - loop { - { - let timer = Timer::get_instance(); - let mut timer = timer.lock().unwrap_or_exit(""); - //info!("timer:{:?}", timer); - let tasks = timer.tasks_to_schedule(); - for task in tasks { - task.run_task(); - } - } - thread::sleep(Duration::from_secs(1)); - let finished = cmd.is_finished(); - if finished { - break; + let log_path = format!("./{}.log", gen_rand_str()); + File::create(log_path.as_str()).unwrap_or_exit("create log path fail."); + let ret = new( + filename.as_str(), + &username(), + CMD_TYPE, + "./", + 1200, + 10240, + log_path.as_str(), + "", + "", + "", + ); + let mut cmd = ret.unwrap(); + let ret = cmd.run().await; + assert!(ret.is_ok()); + + loop { + { + let timer = Timer::get_instance(); + let mut timer = timer.lock().unwrap_or_exit(""); + //info!("timer:{:?}", timer); + let tasks = timer.tasks_to_schedule(); + for task in tasks { + task.run_task(); } } - assert_eq!(cmd.is_timeout(), false); - fs::remove_file(filename.as_str()).unwrap(); - }); + tokio::time::delay_for(Duration::from_secs(1)).await; + let finished = cmd.is_finished(); + if finished { + break; + } + } + assert_eq!(cmd.is_timeout(), false); + fs::remove_file(filename.as_str()).unwrap(); } } diff --git a/src/executor/shell_command.rs b/src/executor/shell_command.rs old mode 100644 new mode 100755 index 485db8d..f9df837 --- a/src/executor/shell_command.rs +++ b/src/executor/shell_command.rs @@ -1,4 +1,4 @@ -use std::env; +use std::collections::HashMap; use std::fmt::Debug; use std::fs::File; use std::io::Write; @@ -6,20 +6,20 @@ use std::path::Path; use std::process::Stdio; use std::sync::Arc; use std::time::Duration; -use std::{fmt, io}; +use std::{env, fmt, io}; +use crate::common::consts::PIPE_BUF_DEFAULT_SIZE; +use crate::executor::proc::{self, BaseCommand, MyCommand}; +use crate::start_failed_err_info; use async_trait::async_trait; use libc; use log::{debug, error, info}; +use procfs::process::Process; use tokio::io::{AsyncReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::time::timeout; -use users::get_user_by_name; - -use crate::common::asserts::GracefulUnwrap; -use crate::common::consts::{DUP2_1_2, OWN_PROCESS_GROUP, PIPE_BUF_DEFAULT_SIZE}; -use crate::executor::proc::{BaseCommand, MyCommand}; -use crate::start_failed_err_info; +use users::os::unix::UserExt; +use users::{get_user_by_name, User}; pub struct ShellCommand { base: Arc, @@ -51,29 +51,20 @@ impl ShellCommand { } } - fn sudo_check(&self) -> Result<(), String> { - if !cmd_exists("sudo") { - let ret = format!( - "ShellCommand {} start fail, working_directory:{}, username: {}: sudo not exists", - self.base.cmd_path, self.base.work_dir, self.base.username - ); - *self.base.err_info.lock().unwrap() = start_failed_err_info!(ERR_SUDO_NOT_EXISTS); - return Err(ret); - } - Ok(()) - } - - fn user_check(&self) -> Result<(), String> { - if !user_exists(self.base.username.as_str()) { - let ret = format!( - "ShellCommand {} start fail, working_directory:{}, username: {}: user not exists", - self.base.cmd_path, self.base.work_dir, self.base.username - ); - *self.base.err_info.lock().unwrap() = - start_failed_err_info!(ERR_USER_NOT_EXISTS, self.base.username); - return Err(ret); - } - Ok(()) + fn user_check(&self) -> Result { + let user = get_user_by_name(self.base.username.as_str()); + return match user { + Some(user) => Ok(user), + None => { + let ret = format!( + "ShellCommand {} start fail, working_directory:{}, username: {}: user not exists", + self.base.cmd_path, self.base.work_dir, self.base.username + ); + *self.base.err_info.lock().unwrap() = + start_failed_err_info!(ERR_USER_NOT_EXISTS, self.base.username); + Err(ret) + } + }; } fn work_dir_check(&self) -> Result<(), String> { @@ -89,53 +80,42 @@ impl ShellCommand { Ok(()) } - fn work_dir_permission_check(&self) -> Result<(), String> { - if !working_directory_permission(self.base.work_dir.as_str(), self.base.username.as_str()) { - let ret = format!( - "ShellCommand {} start fail, working_directory:{}, username: {}: user has no permission to working_directory.", - self.base.cmd_path, self.base.work_dir, self.base.username - ); - *self.base.err_info.lock().unwrap() = start_failed_err_info!( - ERR_USER_NO_PERMISSION_OF_WORKING_DIRECTORY, - self.base.username, - self.base.work_dir - ); - return Err(ret); - } - Ok(()) - } + fn prepare_cmd(&self, user: User) -> Command { + let mut envs = HashMap::new(); + match user.home_dir().to_str() { + Some(dir) => { + envs.insert("HOME", dir); + } + None => {} + }; + envs.insert("USER", self.base.username.as_str()); + envs.insert("LOGNAME", self.base.username.as_str()); + + let mut shell = "bash"; + let mut login_init = ". ~/.bash_profile 2> /dev/null || . ~/.bashrc 2> /dev/null ; "; + if !cmd_exists(shell) { + shell = "sh"; + login_init = ""; + }; + let entrypoint = format!("{}{}", login_init, self.base.cmd_path()); + + let mut cmd = Command::new(shell); + cmd.args(&["-c", entrypoint.as_str()]) + .uid(user.uid()) + .gid(user.primary_group_id()) + .current_dir(self.base.work_dir.clone()) + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .envs(envs); - fn prepare_cmd(&self) -> Command { - let mut shell = "sh"; - let mut entrypoint = format!( - "cd {} && {}", - self.base.work_dir.as_str(), - self.base.cmd_path.as_str() - ); - if cmd_exists("bash") { - shell = "bash"; - entrypoint = format!( - ". ~/.bash_profile 2> /dev/null || . ~/.bashrc 2> /dev/null ; {}", - entrypoint - ); + unsafe { + // Redirect stderr to stdout, thus the output order will be exactly same with origin + cmd.pre_exec(dup2_1_2); + // The command and its sub-processes will be in an independent process group, + // thus we can kill them cleanly by kill the whole process group when we need. + cmd.pre_exec(own_process_group); } - let mut cmd = Command::new("sudo"); - cmd.args(&[ - "-Hu", - self.base.username.as_str(), - shell, - "-c", - entrypoint.as_str(), - ]) - .stdin(Stdio::null()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - // Redirect stderr to stdout, thus the output order will be exactly same with origin - pre_exec_for_cmd(&mut cmd, DUP2_1_2); - // The command and its sub-processes will be in an independent process group, - // thus we can kill them cleanly by kill the whole process group when we need. - pre_exec_for_cmd(&mut cmd, OWN_PROCESS_GROUP); cmd } } @@ -146,18 +126,14 @@ impl MyCommand for ShellCommand { // pre check before spawn cmd self.store_path_check()?; - self.sudo_check()?; - - self.user_check()?; - self.work_dir_check()?; - self.work_dir_permission_check()?; + let user = self.user_check()?; let log_file = self.open_log_file()?; // start the process async - let mut child = self.prepare_cmd().spawn().map_err(|e| { + let mut child = self.prepare_cmd(user).spawn().map_err(|e| { *self.base.err_info.lock().unwrap() = e.to_string(); format!( "ShellCommand {}, working_directory:{}, start fail: {}", @@ -200,8 +176,12 @@ impl BaseCommand { let stdout = child.stdout.take(); let mut reader = BufReader::new(stdout.unwrap()); let mut byte_after_finish = 0; + let proc = match Process::new(pid as i32) { + Ok(proc)=> proc, + Err(_)=> return , + }; loop { - let process_finish = is_process_finish(pid); + let process_finish = !proc.is_alive(); let timeout_read = timeout(Duration::from_millis(100), reader.read(&mut buffer[..])).await; @@ -269,71 +249,10 @@ impl BaseCommand { } } -fn is_process_finish(pid: u32) -> bool { - return match procfs::process::Process::new(pid as i32) { - Ok(proc) => { - if proc.stat.state == 'Z' || proc.stat.state == 'z'{ - true - } else { - false - } - } - Err(_) => true, - }; -} - -fn pre_exec_for_cmd(cmd: &mut Command, func_name: &str) { - let func = match func_name { - DUP2_1_2 => dup2_1_2, - OWN_PROCESS_GROUP => own_process_group, - _ => Err("").unwrap_or_exit( - format!("invalid func_name of pre_exec_for_cmd: {}", func_name).as_str(), - ), - }; - unsafe { - cmd.pre_exec(func); - } -} - fn working_directory_exists(path: &str) -> bool { return Path::new(path).exists(); } -fn working_directory_permission(dir: &str, username: &str) -> bool { - let ret = std::process::Command::new("sudo") - .args(&["-u", username, "sh", "-c", "cd", dir]) - .status(); - match ret { - Ok(status) => return status.success(), - Err(e) => { - error!( - "check working_directory permission err:{}, username:{}, dir: {}", - e, username, dir - ); - false - } - } -} - -fn user_exists(username: &str) -> bool { - if let None = get_user_by_name(username) { - return false; - } - true -} - -fn cmd_exists(cmd: &str) -> bool { - if let Ok(path) = env::var("PATH") { - for p in path.split(":") { - let p_str = format!("{}/{}", p, cmd); - if Path::new(&p_str).exists() { - return true; - } - } - } - false -} - fn dup2_1_2() -> Result<(), io::Error> { unsafe { if libc::dup2(1, 2) != -1 { @@ -354,6 +273,18 @@ fn own_process_group() -> Result<(), io::Error> { } } +fn cmd_exists(cmd: &str) -> bool { + if let Ok(path) = env::var("PATH") { + for p in path.split(":") { + let p_str = format!("{}/{}", p, cmd); + if Path::new(&p_str).exists() { + return true; + } + } + } + false +} + #[cfg(test)] mod tests { use super::*; @@ -364,21 +295,8 @@ mod tests { println!("fmt cmd:{:?}", cmd); } - #[test] - fn test_user_exists() { - assert_eq!(user_exists("root"), true); - assert_eq!(user_exists("hacker-neo"), false); - } - #[test] fn test_working_directory_exists() { assert_eq!(working_directory_exists("/etc"), true); - assert_eq!(user_exists("/etcdefg"), false); - } - - #[test] - fn test_cmd_exists() { - assert_eq!(cmd_exists("pwd"), true); - assert_eq!(cmd_exists("pwd110"), false); } } diff --git a/src/http/store.rs b/src/http/store.rs index bbb28a7..b38d418 100644 --- a/src/http/store.rs +++ b/src/http/store.rs @@ -168,7 +168,7 @@ impl TaskFileStore { // store task file let mut file = self.create_file(&task_file_path, true, true)?; let s = t.decode_command()?; - let res = file.write_all(s.as_bytes()); + let res = file.write_all(&s); if res.is_err() { return Err("fail to store command in task file".to_string()); } diff --git a/src/http/thread.rs b/src/http/thread.rs index 46f5653..5e9c528 100644 --- a/src/http/thread.rs +++ b/src/http/thread.rs @@ -372,7 +372,7 @@ impl HttpWorker { proc_res .run() .await - .map_err(|_| error!("start process fail")) + .map_err(|e| error!("start process fail {}",e)) .ok(); let cmd_arc = Arc::new(Mutex::new(proc_res)); @@ -404,6 +404,8 @@ mod tests { use std::time::Duration; use tokio::sync::Mutex; use tokio::time::timeout; + #[cfg(unix)] + use users::get_current_username; fn gen_rand_str() -> String { thread_rng().sample_iter(&Alphanumeric).take(10).collect() @@ -441,7 +443,10 @@ mod tests { time_out, command: cmd.to_string(), command_type: cmd_type.to_string(), - username: "root".to_string(), + #[cfg(unix)] + username: String::from(get_current_username().unwrap().to_str().unwrap()), + #[cfg(windows)] + username: "system".to_string(), working_directory: "./".to_string(), cos_bucket_url: "".to_string(), cos_bucket_prefix: "".to_string(), diff --git a/src/ontime/self_update.rs b/src/ontime/self_update.rs index aa37e90..ac08e59 100644 --- a/src/ontime/self_update.rs +++ b/src/ontime/self_update.rs @@ -1,35 +1,33 @@ use std::fs::{create_dir_all, File}; use std::io; use std::io::Write; -#[cfg(unix)] -use std::os::unix::fs::PermissionsExt; -#[cfg(unix)] -use std::fs::{set_permissions,Permissions}; - -#[cfg(windows)] -use crate::daemonizer::wow64_disable_exc; use std::process::Command; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use bytes::Bytes; -use log::debug; -use log::error; -use log::info; -use log::warn; -use tokio::runtime::Builder; -use tokio::runtime::Runtime; +use log::{debug, error, info, warn}; +use tokio::runtime::{Builder, Runtime}; use unzip::{Unzipper, UnzipperStats}; use crate::common::consts::{ - AGENT_FILENAME, INVOKE_API, SELF_UPDATE_FILENAME, - SELF_UPDATE_PATH, SELF_UPDATE_SCRIPT, UPDATE_DOWNLOAD_TIMEOUT, UPDATE_FILE_UNZIP_DIR, + AGENT_FILENAME, INVOKE_API, SELF_UPDATE_FILENAME, SELF_UPDATE_PATH, SELF_UPDATE_SCRIPT, + UPDATE_DOWNLOAD_TIMEOUT, UPDATE_FILE_UNZIP_DIR, }; -#[cfg(unix)] -use crate::common::consts::{FILE_EXECUTE_PERMISSION_MODE}; use crate::http::{HttpRequester, InvokeAPIAdapter, Requester}; use crate::types::{AgentError, CheckUpdateResponse, HttpMethod}; +cfg_if::cfg_if! { + if #[cfg(unix)] { + use std::os::unix::fs::PermissionsExt; + use std::fs::{set_permissions,Permissions}; + + use crate::common::consts::{INSTALL_SCRIPT, FILE_EXECUTE_PERMISSION_MODE}; + } else if #[cfg(windows)] { + use crate::daemonizer::wow64_disable_exc; + } +} + pub fn try_update(self_updating: Arc, need_restart: Arc) { let rt_res = Builder::new().basic_scheduler().enable_all().build(); if let Err(e) = rt_res { @@ -307,8 +305,8 @@ fn run_self_update_script( #[cfg(unix)] let cmd = Command::new("sh").arg("-c").arg(script).output(); #[cfg(windows)] - let cmd = wow64_disable_exc(move ||{ - Command::new(script.clone()).output()}); + let cmd = wow64_disable_exc(move || Command::new(script.clone()).output()); + if let Err(e) = cmd { return Err(format!("self update run ret: {:?}", e)); } @@ -325,6 +323,48 @@ fn run_self_update_script( } } +pub fn try_restart_agent() -> Result<(), String> { + cfg_if::cfg_if! { + if #[cfg(unix)] { + let script = format!("{}/{}/{}", + SELF_UPDATE_PATH.to_string(), + UPDATE_FILE_UNZIP_DIR.to_string(), + INSTALL_SCRIPT.to_string(), + ); + if let Err(e) = set_execute_permission(script.clone()) { + return Err(format!("set execute permission fail: {:?}", e)) + } + let cmd = Command::new("sh") + .args(&[ + "-c", + &script, + "restart" + ]) + .output(); + } else if #[cfg(windows)] { + let cmd = wow64_disable_exc(move ||{ + Command::new("cmd.exe") + .args(&["/C","sc stop tatsvc & sc start tatsvc"]) + .output()}); + } + } + + if let Err(e) = cmd { + return Err(format!("run cmd fail: {:?}", e)); + } + let out = cmd.unwrap(); + let stdout = String::from_utf8_lossy(out.stdout.as_slice()); + let stderr = String::from_utf8_lossy(out.stderr.as_slice()); + debug!("stdout of try restart agent:[{}]", stdout); + debug!("stderr of try restart agent:[{}]", stderr); + + if out.status.success() { + Ok(()) + } else { + Err(format!("ret code:{:?}", out.status.code())) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/ontime/thread.rs b/src/ontime/thread.rs index 79324b1..90423d4 100644 --- a/src/ontime/thread.rs +++ b/src/ontime/thread.rs @@ -13,11 +13,8 @@ use futures01::sync::mpsc::UnboundedSender; use websocket::OwnedMessage; use crate::common::asserts::GracefulUnwrap; -use crate::common::consts::{ - ONTIME_CHECK_TASK_NUM, ONTIME_KICK_INTERVAL, ONTIME_KICK_SOURCE, ONTIME_PING_INTERVAL, - ONTIME_THREAD_INTERVAL, ONTIME_UPDATE_INTERVAL, -}; -use crate::ontime::self_update::try_update; +use crate::common::consts::{ONTIME_CHECK_TASK_NUM, ONTIME_KICK_INTERVAL, ONTIME_KICK_SOURCE, ONTIME_PING_INTERVAL, ONTIME_THREAD_INTERVAL, ONTIME_UPDATE_INTERVAL}; +use crate::ontime::self_update::{try_update, try_restart_agent}; use crate::ontime::timer::Timer; use crate::types::inner_msg::KickMsg; @@ -216,9 +213,15 @@ fn check_running_task_num( let task_num = running_task_num.load(Ordering::SeqCst); if task_num == 0 { info!( - "running tasks num: {}, need_restart is {}, exit prgram", + "running tasks num: {}, need_restart is {}, restart program.", task_num, restart_flag ); + + if let Err(e) = try_restart_agent(){ + warn!("try restart agent fail: {:?}", e) + } + + // should not comes here, because agent should has been killed when called `try_restart_agent`. std::process::exit(2); } debug!( diff --git a/src/types/task.rs b/src/types/task.rs index ca0c254..a90a4e0 100644 --- a/src/types/task.rs +++ b/src/types/task.rs @@ -4,7 +4,10 @@ use serde::{Deserialize, Serialize}; use crate::uname::common::UnameExt; use crate::uname::Uname; - +#[cfg(windows)] +use winapi::um::winnls::GetOEMCP; +#[cfg(windows)] +use codepage_strings::Coding; //============================================================================== // Declare standard request and response format for C/S communication // general parameters in reqeust @@ -159,12 +162,16 @@ pub struct DescribeTasksResponse { pub type DescribeTasksRequest = Empty; impl InvocationNormalTask { - pub fn decode_command(&self) -> Result { + pub fn decode_command(&self) -> Result, String> { match base64::decode(&self.command) { - Ok(command) => match std::str::from_utf8(&command) { - Ok(s) => Ok(String::from(s)), - Err(e) => Err(format!("parse error: {:?}", e)), - }, + Ok(command) => { + #[cfg(windows)] + let command = Coding::new(unsafe { GetOEMCP() } as u16) + .map_err(|e| e.to_string())? + .encode(String::from_utf8_lossy(&command)) + .map_err(|e|e.to_string())?; + Ok(command) + } Err(e) => Err(format!("decode error: {:?}", e)), } } @@ -402,7 +409,7 @@ mod tests { cos_bucket_prefix: format!(""), }; assert_eq!( - tasks1.decode_command().unwrap(), + String::from_utf8_lossy(&tasks1.decode_command().unwrap()), String::from("ls -l;\necho \"Hello World\"") ); } diff --git a/src/ws/thread.rs b/src/ws/thread.rs index 45db934..d1f4c0c 100644 --- a/src/ws/thread.rs +++ b/src/ws/thread.rs @@ -1,8 +1,8 @@ -use std::{thread, time}; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::Sender; +use std::sync::Arc; use std::time::Duration; +use std::{thread, time}; use futures01::future::Future; use futures01::sink::{Sink, Wait}; @@ -15,23 +15,22 @@ use log::info; use serde_json; use tokio01 as tokio; use tokio01::prelude::FutureExt; -use websocket::{ClientBuilder, CloseData, OwnedMessage}; use websocket::header::Headers; use websocket::result::WebSocketError; - +use websocket::{ClientBuilder, CloseData, OwnedMessage}; use crate::common::asserts::GracefulUnwrap; use crate::common::consts::{ - AGENT_VERSION, MAX_PING_FROM_LAST_PONG, WS_ACTIVE_CLOSE, WS_ACTIVE_CLOSE_CODE, - WS_CONNECT_TIMEOUT, WS_KERNEL_NAME_HEADER, WS_LAST_CLOSE_INTERVAL, WS_MSG_TYPE_ACK, - WS_MSG_TYPE_KICK, WS_PASSIVE_CLOSE, WS_PASSIVE_CLOSE_CODE, WS_RECONNECT_INTERVAL, WS_URL, - WS_VERSION_HEADER,VPCID_HEADER,VIP_HEADER + AGENT_VERSION, MAX_PING_FROM_LAST_PONG, VIP_HEADER, VPCID_HEADER, WS_ACTIVE_CLOSE, + WS_ACTIVE_CLOSE_CODE, WS_CONNECT_TIMEOUT, WS_KERNEL_NAME_HEADER, WS_LAST_CLOSE_INTERVAL, + WS_MSG_TYPE_ACK, WS_MSG_TYPE_KICK, WS_PASSIVE_CLOSE, WS_PASSIVE_CLOSE_CODE, + WS_RECONNECT_INTERVAL, WS_URL, WS_VERSION_HEADER, }; use crate::common::envs; use crate::types::inner_msg::KickMsg; use crate::types::ws_msg::WsMsg; -use crate::uname::Uname; use crate::uname::common::UnameExt; +use crate::uname::Uname; pub fn run( kick_sender: Sender, @@ -62,8 +61,6 @@ pub fn run( .send(ping_sender) .unwrap_or_exit("ping channel send fail"); - thread::sleep(time::Duration::from_secs(WS_RECONNECT_INTERVAL)); - let sender = kick_sender.clone(); let runner = ClientBuilder::new(WS_URL) @@ -112,6 +109,8 @@ pub fn run( info!("establishing new ws connection"); runtime.block_on(runner).or_log("ws runtime run failed"); + + thread::sleep(time::Duration::from_secs(WS_RECONNECT_INTERVAL)); } }); @@ -127,10 +126,7 @@ fn gen_ver_header() -> Headers { } if let Ok(uname) = Uname::new() { - headers.set_raw( - WS_KERNEL_NAME_HEADER, - vec![uname.sys_name().into_bytes()], - ); + headers.set_raw(WS_KERNEL_NAME_HEADER, vec![uname.sys_name().into_bytes()]); } debug!("ws header:{:?}", headers); diff --git a/tests/support/response.rs b/tests/support/response.rs index 09e39a8..d849cf5 100644 --- a/tests/support/response.rs +++ b/tests/support/response.rs @@ -78,6 +78,8 @@ pub fn tasks_response() -> Response { command: base64::encode("ls -l".as_bytes()), command_type: format!("SHELL"), working_directory: String::from("/root/"), + cos_bucket_url: "".to_string(), + cos_bucket_prefix: "".to_string(), }; let resp = DescribeTasksResponse { invocation_normal_task_set: vec![tasks],