diff --git a/Project.toml b/Project.toml index 016dd24..fe72d55 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "K8sDeputy" uuid = "2481ae95-212f-4650-bb21-d53ea3caf09f" authors = ["Beacon Biosignals, Inc"] -version = "0.1.4" +version = "0.2.0" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" @@ -13,7 +13,7 @@ Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" Aqua = "0.7" Dates = "1" HTTP = "1" -Mocking = "0.7" +Mocking = "0.7, 0.8" Sockets = "1" Test = "1" julia = "1.6" diff --git a/src/graceful_termination.jl b/src/graceful_termination.jl index 7732b5f..a4ef3c3 100644 --- a/src/graceful_termination.jl +++ b/src/graceful_termination.jl @@ -9,18 +9,17 @@ # Linux stores PID files and UNIX-domain sockets in `/run`. Users with K8s containers # utilizing read-only file systems should make use of a volume mount to allow K8sDeputy.jl -# to write to `/run`. Users can change the IPC directory by specifying `DEPUTY_IPC_DIR` but -# this is mainly just used for testing. -_deputy_ipc_dir() = get(ENV, "DEPUTY_IPC_DIR", "/run") +# to write to `/run`. +_ipc_dir() = "/run" # Write transient UNIX-domain sockets to the IPC directory. function _graceful_terminator_socket_path(pid::Int32) - return joinpath(_deputy_ipc_dir(), "graceful-terminator.$pid.socket") + return joinpath(@mock(_ipc_dir()), "graceful-terminator.$pid.socket") end # Following the Linux convention for pid files: # https://refspecs.linuxfoundation.org/FHS_3.0/fhs/ch03s15.html -entrypoint_pid_file() = joinpath(_deputy_ipc_dir(), "julia-entrypoint.pid") +entrypoint_pid_file() = joinpath(@mock(_ipc_dir()), "julia-entrypoint.pid") set_entrypoint_pid(pid::Integer) = write(entrypoint_pid_file(), string(pid) * "\n") function entrypoint_pid() diff --git a/test/graceful_termination.jl b/test/graceful_termination.jl index 1042005..2f5c4ac 100644 --- a/test/graceful_termination.jl +++ b/test/graceful_termination.jl @@ -1,20 +1,23 @@ @testset "graceful_terminator" begin - deputy_ipc_dir = mktempdir() - @testset "Julia entrypoint" begin code = quote - using K8sDeputy + using K8sDeputy, Mocking + Mocking.activate() + ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR + atexit(() -> @info "SHUTDOWN COMPLETE") - graceful_terminator() do - @info "GRACEFUL TERMINATION HANDLER" - exit(2) - return nothing + apply(ipc_dir_patch) do + graceful_terminator() do + @info "GRACEFUL TERMINATION HANDLER" + exit(2) + return nothing + end end sleep(60) end + code = join(code.args, '\n') # Evaluate at top-level cmd = `$(Base.julia_cmd()) --color=no -e $code` - cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir) buffer = IOBuffer() p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false) @test timedwait(() -> process_running(p), Second(5)) === :ok @@ -24,7 +27,7 @@ # When no PID is passed in the process ID is read from the Julia entrypoint file. # Blocks untils the process terminates. - withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do + apply(ipc_dir_patch) do @test graceful_terminate() === nothing end @@ -41,18 +44,23 @@ @testset "multiple Julia processes" begin code = quote - using K8sDeputy + using K8sDeputy, Mocking + Mocking.activate() + ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR + atexit(() -> @info "SHUTDOWN COMPLETE") - graceful_terminator(; set_entrypoint=false) do - @info "GRACEFUL TERMINATION HANDLER" - exit(2) - return nothing + apply(ipc_dir_patch) do + graceful_terminator(; set_entrypoint=false) do + @info "GRACEFUL TERMINATION HANDLER" + exit(2) + return nothing + end end sleep(60) end + code = join(code.args, '\n') # Evaluate at top-level cmd = `$(Base.julia_cmd()) --color=no -e $code` - cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir) buffer1 = IOBuffer() buffer2 = IOBuffer() p1 = run(pipeline(cmd; stdout=buffer1, stderr=buffer1); wait=false) @@ -63,7 +71,7 @@ sleep(3) # Blocks untils the process terminates - withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do + apply(ipc_dir_patch) do @test graceful_terminate(getpid(p1)) === nothing @test graceful_terminate(getpid(p2)) === nothing end @@ -80,33 +88,38 @@ @test output2 == expected end - # When users set `DEPUTY_IPC_DIR` they may be using a K8s volume. As even `emptyDir` - # volumes persist for the lifetime of the pod we may have a UNIX-domain socket already - # present from a previous restart. + # K8s volumes persist for the lifetime of the pod (even a `emptyDir`). If a container + # restarts we may already have a UNIX-domain socket present in the IPC directory. @testset "bind after restart" begin code = quote - using K8sDeputy + using K8sDeputy, Mocking using Sockets: listen + Mocking.activate() + ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR # Generate the socket as if the K8s pod had restarted and this path remained. # One difference with this setup is the process which created the socket is # still running so there may be a write lock on the file which definitely # wouldn't exist if the pod had restarted. Note that closing the socket will # remove the path. - socket_path = K8sDeputy._graceful_terminator_socket_path(getpid()) + socket_path = apply(ipc_dir_patch) do + return K8sDeputy._graceful_terminator_socket_path(getpid()) + end server = listen(socket_path) atexit(() -> @info "SHUTDOWN COMPLETE") - graceful_terminator(; set_entrypoint=false) do - @info "GRACEFUL TERMINATION HANDLER" - exit(2) - return nothing + apply(ipc_dir_patch) do + graceful_terminator(; set_entrypoint=false) do + @info "GRACEFUL TERMINATION HANDLER" + exit(2) + return nothing + end end sleep(60) end + code = join(code.args, '\n') # Evaluate at top-level cmd = `$(Base.julia_cmd()) --color=no -e $code` - cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir) buffer = IOBuffer() p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false) @test timedwait(() -> process_running(p), Second(5)) === :ok @@ -115,14 +128,14 @@ sleep(3) # Socket exists as a UNIX-domain socket - socket_path = withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do + socket_path = apply(ipc_dir_patch) do return K8sDeputy._graceful_terminator_socket_path(getpid(p)) end @test ispath(socket_path) @test !isfile(socket_path) # Blocks untils the process terminates - withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do + apply(ipc_dir_patch) do @test graceful_terminate(getpid(p)) === nothing end @test process_exited(p) diff --git a/test/health.jl b/test/health.jl index 87ced77..5c77bd1 100644 --- a/test/health.jl +++ b/test/health.jl @@ -192,27 +192,30 @@ end end @testset "graceful termination" begin - deputy_ipc_dir = mktempdir() port = rand(EPHEMERAL_PORT_RANGE) code = quote - using K8sDeputy, Sockets + using K8sDeputy, Mocking, Sockets + Mocking.activate() + ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR shutdown_handler() = @info "SHUTDOWN HANDLER" atexit(() -> @info "SHUTDOWN COMPLETE") deputy = Deputy(; shutdown_handler) - graceful_terminator(; set_entrypoint=false) do - @info "GRACEFUL TERMINATION HANDLER" - shutdown!(deputy) - return nothing + apply(ipc_dir_patch) do + graceful_terminator(; set_entrypoint=false) do + @info "GRACEFUL TERMINATION HANDLER" + shutdown!(deputy) + return nothing + end end K8sDeputy.serve!(deputy, Sockets.localhost, $port) readied!(deputy) sleep(60) end + code = join(code.args, '\n') # Evaluate at top-level cmd = `$(Base.julia_cmd()) --color=no -e $code` - cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir) buffer = IOBuffer() p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false) @test timedwait(() -> process_running(p), Second(5)) === :ok @@ -222,7 +225,7 @@ end end === :ok # Blocks untils the process terminates - withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do + apply(ipc_dir_patch) do return graceful_terminate(getpid(p)) end @test process_exited(p) diff --git a/test/runtests.jl b/test/runtests.jl index af18fe9..c14e121 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -12,6 +12,9 @@ const EPHEMERAL_PORT_RANGE = 49152:65535 Mocking.activate() +const IPC_DIR = mktempdir() +ipc_dir_patch = @patch K8sDeputy._ipc_dir() = IPC_DIR + @testset "K8sDeputy.jl" begin @testset "Aqua" begin Aqua.test_all(K8sDeputy; ambiguities=false)