Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop support for DEPUTY_IPC_DIR env var #14

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "K8sDeputy"
uuid = "2481ae95-212f-4650-bb21-d53ea3caf09f"
authors = ["Beacon Biosignals, Inc"]
version = "0.1.4"
version = "0.2.0"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand All @@ -13,7 +13,7 @@ Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
Aqua = "0.7"
Dates = "1"
HTTP = "1"
Mocking = "0.7"
Mocking = "0.7, 0.8"
Sockets = "1"
Test = "1"
julia = "1.6"
Expand Down
9 changes: 4 additions & 5 deletions src/graceful_termination.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@

# Linux stores PID files and UNIX-domain sockets in `/run`. Users with K8s containers
# utilizing read-only file systems should make use of a volume mount to allow K8sDeputy.jl
# to write to `/run`. Users can change the IPC directory by specifying `DEPUTY_IPC_DIR` but
# this is mainly just used for testing.
_deputy_ipc_dir() = get(ENV, "DEPUTY_IPC_DIR", "/run")
# to write to `/run`.
_ipc_dir() = "/run"

Check warning on line 13 in src/graceful_termination.jl

View check run for this annotation

Codecov / codecov/patch

src/graceful_termination.jl#L13

Added line #L13 was not covered by tests

# Write transient UNIX-domain sockets to the IPC directory.
function _graceful_terminator_socket_path(pid::Int32)
return joinpath(_deputy_ipc_dir(), "graceful-terminator.$pid.socket")
return joinpath(@mock(_ipc_dir()), "graceful-terminator.$pid.socket")
end

# Following the Linux convention for pid files:
# https://refspecs.linuxfoundation.org/FHS_3.0/fhs/ch03s15.html
entrypoint_pid_file() = joinpath(_deputy_ipc_dir(), "julia-entrypoint.pid")
entrypoint_pid_file() = joinpath(@mock(_ipc_dir()), "julia-entrypoint.pid")
set_entrypoint_pid(pid::Integer) = write(entrypoint_pid_file(), string(pid) * "\n")

function entrypoint_pid()
Expand Down
69 changes: 41 additions & 28 deletions test/graceful_termination.jl
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
@testset "graceful_terminator" begin
deputy_ipc_dir = mktempdir()

@testset "Julia entrypoint" begin
code = quote
using K8sDeputy
using K8sDeputy, Mocking
Mocking.activate()
ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR

atexit(() -> @info "SHUTDOWN COMPLETE")
graceful_terminator() do
@info "GRACEFUL TERMINATION HANDLER"
exit(2)
return nothing
apply(ipc_dir_patch) do
graceful_terminator() do
@info "GRACEFUL TERMINATION HANDLER"
exit(2)
return nothing
end
end
sleep(60)
end
code = join(code.args, '\n') # Evaluate at top-level

cmd = `$(Base.julia_cmd()) --color=no -e $code`
cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir)
buffer = IOBuffer()
p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false)
@test timedwait(() -> process_running(p), Second(5)) === :ok
Expand All @@ -24,7 +27,7 @@

# When no PID is passed in the process ID is read from the Julia entrypoint file.
# Blocks untils the process terminates.
withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do
apply(ipc_dir_patch) do
@test graceful_terminate() === nothing
end

Expand All @@ -41,18 +44,23 @@

@testset "multiple Julia processes" begin
code = quote
using K8sDeputy
using K8sDeputy, Mocking
Mocking.activate()
ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR

atexit(() -> @info "SHUTDOWN COMPLETE")
graceful_terminator(; set_entrypoint=false) do
@info "GRACEFUL TERMINATION HANDLER"
exit(2)
return nothing
apply(ipc_dir_patch) do
graceful_terminator(; set_entrypoint=false) do
@info "GRACEFUL TERMINATION HANDLER"
exit(2)
return nothing
end
end
sleep(60)
end
code = join(code.args, '\n') # Evaluate at top-level

cmd = `$(Base.julia_cmd()) --color=no -e $code`
cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir)
buffer1 = IOBuffer()
buffer2 = IOBuffer()
p1 = run(pipeline(cmd; stdout=buffer1, stderr=buffer1); wait=false)
Expand All @@ -63,7 +71,7 @@
sleep(3)

# Blocks untils the process terminates
withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do
apply(ipc_dir_patch) do
@test graceful_terminate(getpid(p1)) === nothing
@test graceful_terminate(getpid(p2)) === nothing
end
Expand All @@ -80,33 +88,38 @@
@test output2 == expected
end

# When users set `DEPUTY_IPC_DIR` they may be using a K8s volume. As even `emptyDir`
# volumes persist for the lifetime of the pod we may have a UNIX-domain socket already
# present from a previous restart.
# K8s volumes persist for the lifetime of the pod (even a `emptyDir`). If a container
# restarts we may already have a UNIX-domain socket present in the IPC directory.
@testset "bind after restart" begin
code = quote
using K8sDeputy
using K8sDeputy, Mocking
using Sockets: listen
Mocking.activate()
ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR

# Generate the socket as if the K8s pod had restarted and this path remained.
# One difference with this setup is the process which created the socket is
# still running so there may be a write lock on the file which definitely
# wouldn't exist if the pod had restarted. Note that closing the socket will
# remove the path.
socket_path = K8sDeputy._graceful_terminator_socket_path(getpid())
socket_path = apply(ipc_dir_patch) do
return K8sDeputy._graceful_terminator_socket_path(getpid())
end
server = listen(socket_path)

atexit(() -> @info "SHUTDOWN COMPLETE")
graceful_terminator(; set_entrypoint=false) do
@info "GRACEFUL TERMINATION HANDLER"
exit(2)
return nothing
apply(ipc_dir_patch) do
graceful_terminator(; set_entrypoint=false) do
@info "GRACEFUL TERMINATION HANDLER"
exit(2)
return nothing
end
end
sleep(60)
end
code = join(code.args, '\n') # Evaluate at top-level

cmd = `$(Base.julia_cmd()) --color=no -e $code`
cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir)
buffer = IOBuffer()
p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false)
@test timedwait(() -> process_running(p), Second(5)) === :ok
Expand All @@ -115,14 +128,14 @@
sleep(3)

# Socket exists as a UNIX-domain socket
socket_path = withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do
socket_path = apply(ipc_dir_patch) do
return K8sDeputy._graceful_terminator_socket_path(getpid(p))
end
@test ispath(socket_path)
@test !isfile(socket_path)

# Blocks untils the process terminates
withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do
apply(ipc_dir_patch) do
@test graceful_terminate(getpid(p)) === nothing
end
@test process_exited(p)
Expand Down
19 changes: 11 additions & 8 deletions test/health.jl
Original file line number Diff line number Diff line change
Expand Up @@ -192,27 +192,30 @@ end
end

@testset "graceful termination" begin
deputy_ipc_dir = mktempdir()
port = rand(EPHEMERAL_PORT_RANGE)
code = quote
using K8sDeputy, Sockets
using K8sDeputy, Mocking, Sockets
Mocking.activate()
ipc_dir_patch = @patch K8sDeputy._ipc_dir() = $IPC_DIR

shutdown_handler() = @info "SHUTDOWN HANDLER"
atexit(() -> @info "SHUTDOWN COMPLETE")

deputy = Deputy(; shutdown_handler)
graceful_terminator(; set_entrypoint=false) do
@info "GRACEFUL TERMINATION HANDLER"
shutdown!(deputy)
return nothing
apply(ipc_dir_patch) do
graceful_terminator(; set_entrypoint=false) do
@info "GRACEFUL TERMINATION HANDLER"
shutdown!(deputy)
return nothing
end
end
K8sDeputy.serve!(deputy, Sockets.localhost, $port)
readied!(deputy)
sleep(60)
end
code = join(code.args, '\n') # Evaluate at top-level

cmd = `$(Base.julia_cmd()) --color=no -e $code`
cmd = addenv(cmd, "DEPUTY_IPC_DIR" => deputy_ipc_dir)
buffer = IOBuffer()
p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false)
@test timedwait(() -> process_running(p), Second(5)) === :ok
Expand All @@ -222,7 +225,7 @@ end
end === :ok

# Blocks untils the process terminates
withenv("DEPUTY_IPC_DIR" => deputy_ipc_dir) do
apply(ipc_dir_patch) do
return graceful_terminate(getpid(p))
end
@test process_exited(p)
Expand Down
3 changes: 3 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ const EPHEMERAL_PORT_RANGE = 49152:65535

Mocking.activate()

const IPC_DIR = mktempdir()
ipc_dir_patch = @patch K8sDeputy._ipc_dir() = IPC_DIR

@testset "K8sDeputy.jl" begin
@testset "Aqua" begin
Aqua.test_all(K8sDeputy; ambiguities=false)
Expand Down
Loading