diff --git a/.JuliaFormatter.toml b/.JuliaFormatter.toml new file mode 100644 index 0000000..857c3ae --- /dev/null +++ b/.JuliaFormatter.toml @@ -0,0 +1 @@ +style = "yas" diff --git a/.github/workflows/CI.yaml b/.github/workflows/CI.yaml new file mode 100644 index 0000000..5c0dede --- /dev/null +++ b/.github/workflows/CI.yaml @@ -0,0 +1,59 @@ +--- +name: CI +on: + workflow_dispatch: + push: + branches: + - main + tags: ["*"] + paths: + - "src/**" + - "test/**" + - "Project.toml" + - ".github/workflows/CI.yaml" + pull_request: + paths: + - "src/**" + - "test/**" + - "Project.toml" + - ".github/workflows/CI.yaml" +concurrency: + # Skip intermediate builds: always. + # Cancel intermediate builds: only if it is a pull request build. + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ startsWith(github.ref, 'refs/pull/') }} +jobs: + test: + name: Julia ${{ matrix.version }} - ${{ matrix.runs-on }} - ${{ matrix.arch }} - ${{ matrix.threads}} threads + # These permissions are needed to: + # - Delete old caches: https://github.com/julia-actions/cache#cache-retention + permissions: + actions: write + contents: read + runs-on: ${{ matrix.runs-on }} + strategy: + fail-fast: false + matrix: + version: + - "1.6" # Earliest version of Julia that the package is compatible with + - "1" # Latest Julia release + runs-on: + - ubuntu-latest + arch: + - x64 + threads: + - 1 + env: + JULIA_NUM_THREADS: ${{ matrix.threads }} + steps: + - uses: actions/checkout@v4 + - uses: julia-actions/setup-julia@v1 + with: + version: ${{ matrix.version }} + arch: ${{ matrix.arch }} + - uses: julia-actions/cache@v1 + - uses: julia-actions/julia-runtest@v1 + - uses: julia-actions/julia-processcoverage@v1 + - uses: codecov/codecov-action@v3 + with: + file: lcov.info diff --git a/.github/workflows/DocPreviewCleanup.yaml b/.github/workflows/DocPreviewCleanup.yaml new file mode 100644 index 0000000..bee2b01 --- /dev/null +++ b/.github/workflows/DocPreviewCleanup.yaml @@ -0,0 +1,39 @@ +--- +# remove PR previews once they're merged +# +name: Doc Preview Cleanup +on: + pull_request: + types: [closed] + +# Ensure that only one "Doc Preview Cleanup" workflow is force pushing at a time +concurrency: + group: doc-preview-cleanup + cancel-in-progress: false + +jobs: + doc-preview-cleanup: + runs-on: ubuntu-latest + permissions: + contents: write + env: + PR: ${{ github.event.number }} + steps: + - name: Checkout gh-pages branch + uses: actions/checkout@v4 + with: + ref: gh-pages + - name: Delete preview and history + push changes + run: | + preview_dir="previews/PR${PR?}" + if [ -d "${preview_dir}" ]; then + # Delete preview directory created by this PR + git rm -rf "${preview_dir}" + + # Commit the removed preview directories and truncate history + git config user.name "Documenter.jl" + git config user.email "documenter@juliadocs.github.io" + git commit -m "delete preview" + git branch gh-pages-new $(echo "squash history" | git commit-tree HEAD^{tree}) + git push --force origin gh-pages-new:gh-pages + fi diff --git a/.github/workflows/Documenter.yaml b/.github/workflows/Documenter.yaml new file mode 100644 index 0000000..ad2b5f2 --- /dev/null +++ b/.github/workflows/Documenter.yaml @@ -0,0 +1,58 @@ +--- +name: Documenter +on: + workflow_dispatch: + push: + tags: ["*"] + branches: + - main + paths: + - "docs/**" + - "src/**" + - "Project.toml" + - ".github/workflows/Documenter.yaml" + pull_request: + paths: + - "docs/**" + - "src/**" + - "Project.toml" + - ".github/workflows/Documenter.yaml" + - ".github/workflows/DocPreviewCleanup.yaml" +jobs: + docs: + name: Build + # These permissions are needed to: + # - Delete old caches: https://github.com/julia-actions/cache#usage + permissions: + actions: write + contents: read + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: julia-actions/setup-julia@v1 + with: + version: "1" + show-versioninfo: true + - uses: julia-actions/cache@v1 + - name: Install dependencies + shell: julia --project=docs --color=yes {0} + run: | + using Pkg + Pkg.develop(PackageSpec(path=pwd())) + Pkg.instantiate() + - name: Build docs + uses: julia-actions/julia-docdeploy@v1 + with: + install-package: false # Avoid instantiating twice + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Preview URL + if: ${{ github.event_name == 'pull_request' }} + run: | + repo_owner="${repo%/*}" # e.g. JuliaLang + repo_name="${repo#*/}" # e.g. Example.jl + echo ":books: Documentation preview available at:" | tee -a "$GITHUB_STEP_SUMMARY" + echo "" | tee -a "$GITHUB_STEP_SUMMARY" + env: + repo: ${{ github.repository }} # e.g. JuliaLang/Example.jl + PR: ${{ github.event.number }} diff --git a/.github/workflows/FormatCheck.yaml b/.github/workflows/FormatCheck.yaml new file mode 100644 index 0000000..6b6df9b --- /dev/null +++ b/.github/workflows/FormatCheck.yaml @@ -0,0 +1,46 @@ +--- +name: Format Check +on: + push: + branches: + - main + tags: ["*"] + paths: + - "**/*.jl" + - ".github/workflows/FormatCheck.yaml" + pull_request: + paths: + - "**/*.jl" + - ".github/workflows/FormatCheck.yml" +jobs: + format-check: + name: Julia + # These permissions are needed to: + # - Delete old caches: https://github.com/julia-actions/cache#usage + # - Post formatting suggestions: https://github.com/reviewdog/action-suggester#required-permissions + permissions: + actions: write + contents: read + pull-requests: write + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: julia-actions/setup-julia@v1 + with: + version: "1" + - uses: julia-actions/cache@v1 + - name: Install JuliaFormatter + shell: julia --project=@format --color=yes {0} + run: | + using Pkg + Pkg.add(PackageSpec(; name="JuliaFormatter", version="1")) + - name: Check formatting + shell: julia --project=@format --color=yes {0} + run: | + using JuliaFormatter + format("."; verbose=true) || exit(1) + # Add formatting suggestions to non-draft PRs even if when "Check formatting" fails + - uses: reviewdog/action-suggester@v1 + if: ${{ !cancelled() && github.event_name == 'pull_request' && github.event.pull_request.draft == false }} + with: + tool_name: JuliaFormatter diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..06ba5eb --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +docs/build +Manifest.toml diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3dd4d7e --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2024 Beacon Biosignals, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/Project.toml b/Project.toml new file mode 100644 index 0000000..85bb45d --- /dev/null +++ b/Project.toml @@ -0,0 +1,26 @@ +name = "K8sDeputy" +uuid = "2481ae95-212f-4650-bb21-d53ea3caf09f" +authors = ["Beacon Biosignals, Inc"] +version = "0.1.0" + +[deps] +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" +Mocking = "78c3b35d-d492-501b-9361-3d52fe80e533" +Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" + +[compat] +Aqua = "0.7" +Dates = "1" +HTTP = "1" +Mocking = "0.7" +Sockets = "1" +Test = "1" +julia = "1.6" + +[extras] +Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[targets] +test = ["Aqua", "Test"] diff --git a/README.md b/README.md index 6ea28a5..1552f28 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,8 @@ # K8sDeputy.jl -Provides K8s health checks and graceful termination support on behalf of Julia services + +[![CI](https://github.com/beacon-biosignals/K8sDeputy.jl/actions/workflows/CI.yml/badge.svg)](https://github.com/beacon-biosignals/K8sDeputy.jl/actions/workflows/CI.yml) +[![Code Style: YASGuide](https://img.shields.io/badge/code%20style-yas-violet.svg)](https://github.com/jrevels/YASGuide) +[![Stable Documentation](https://img.shields.io/badge/docs-stable-blue.svg)](https://beacon-biosignals.github.io/K8sDeputy.jl/stable) +[![Dev Documentation](https://img.shields.io/badge/docs-dev-blue.svg)](https://beacon-biosignals.github.io/K8sDeputy.jl/dev) + +Provides K8s health checks and graceful termination support on behalf of Julia services. diff --git a/docs/Project.toml b/docs/Project.toml new file mode 100644 index 0000000..95d1b1a --- /dev/null +++ b/docs/Project.toml @@ -0,0 +1,6 @@ +[deps] +Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" +K8sDeputy = "2481ae95-212f-4650-bb21-d53ea3caf09f" + +[compat] +Documenter = "1.0.0" diff --git a/docs/make.jl b/docs/make.jl new file mode 100644 index 0000000..040578f --- /dev/null +++ b/docs/make.jl @@ -0,0 +1,18 @@ +using K8sDeputy +using Documenter + +pages = ["Home" => "index.md", + "Quickstart" => "quickstart.md", + "Health Checks" => "health_checks.md", + "Graceful Termination" => "graceful_termination.md", + "API" => "api.md"] + +makedocs(; modules=[K8sDeputy], + format=Documenter.HTML(; prettyurls=get(ENV, "CI", nothing) == "true"), + sitename="K8sDeputy.jl", + authors="Beacon Biosignals", + pages) + +deploydocs(; repo="github.com/beacon-biosignals/K8sDeputy.jl.git", + push_preview=true, + devbranch="main") diff --git a/docs/src/api.md b/docs/src/api.md new file mode 100644 index 0000000..06460bc --- /dev/null +++ b/docs/src/api.md @@ -0,0 +1,10 @@ +# API + +```@docs +Deputy +K8sDeputy.serve! +readied! +shutdown! +graceful_terminator +graceful_terminate +``` diff --git a/docs/src/graceful_termination.md b/docs/src/graceful_termination.md new file mode 100644 index 0000000..3a1d1de --- /dev/null +++ b/docs/src/graceful_termination.md @@ -0,0 +1,97 @@ +# Graceful Termination + +Kubernetes (K8s) applications are expected to handle [graceful termination](https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace). Typically, +applications will initiate a graceful termination by handling the `TERM` signal when a K8s pod is to be terminated. + +At this point in time [Julia does not provide user-definable signal handlers](https://github.com/JuliaLang/julia/issues/14675) and the internal Julia signal handler for `TERM` results in the process reporting the signal (with a stack trace) to standard error and exiting. Julia provides users with [`atexit`](https://docs.julialang.org/en/v1/base/base/#Base.atexit) to define callbacks when Julia is terminating but unfortunately this callback system only allows for trivial actions to occur when Julia is shutdown due to handling the `TERM` signal. + +These limitations resulted in K8sDeputy.jl providing an alternative path for handling graceful termination Julia processes. This avoids logging unnecessary error messages and also provides a reliable shutdown callback system for graceful termination. + +## Interface + +The K8sDeputy.jl package provides the `graceful_terminator` function for registering a single user callback upon receiving a graceful termination event. The `graceful_terminate` function can be used from another Julia process to terminate the `graceful_terminator` caller process. For example run the following code in an interactive Julia REPL: + +```julia +using K8sDeputy +graceful_terminator(() -> (@info "Gracefully terminating..."; exit())) +``` + +In another terminal run the following code to initiate graceful termination: + +```sh +julia -e 'using K8sDeputy; graceful_terminate()' +``` + +Once `graceful_terminate` has been called the first process will: execute the callback, log the message, and exit the Julia process. + +!!! note + + By default the `graceful_terminator` function registers the caller Julia process as the "entrypoint" Julia process. Primarily, this allows for out-of-the-box support for Julia + applications running as non-[init](https://en.wikipedia.org/wiki/Init) processes but only allows one Julia process to be defined as the "entrypoint". If you require multiple Julia processes to support graceful termination concurrently you can use `set_entrypoint=false` (e.g. `graceful_terminator(...; set_entrypoint=false)`) and pass in the target process ID to `graceful_terminate`. + +## Deputy Integration + +The `graceful_terminator` function can be combined with the deputy's `shutdown!` function to allow graceful termination of the application and the deputy: + +```julia +using K8sDeputy +deputy = Deputy(; shutdown_handler=() -> @info "Shutting down") +server = K8sDeputy.serve!(deputy, "0.0.0.0") +graceful_terminator(() -> shutdown!(deputy)) + +# Application code +``` + +## Kubernetes Setup + +To configure your K8s container resource to call `graceful_terminate` when terminating you can configure a [`preStop` hook](https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/#container-hooks): + +```yaml +apiVersion: v1 +kind: Pod +spec: + containers: + - name: app + # command: ["/bin/sh", "-c", "julia entrypoint.jl; sleep 1"] + lifecycle: + preStop: + exec: + command: ["julia", "-e", "using K8sDeputy; graceful_terminate()"] + # terminationGracePeriodSeconds: 30 +``` + +!!! note + + Applications with slow shutdown callbacks may want to consider specifying `terminationGracePeriodSeconds` which specifies the maximum duration a pod can take when gracefully terminating. Once the timeout is reached the processes running in the pod are forcibly halted with a `KILL` signal. + +Finally, the entrypoint for the container should also not directly use the Julia as the [init](https://en.wikipedia.org/wiki/Init) process (PID 1). Instead, users should define their entrypoint similarly to +`["/bin/sh", "-c", "julia entrypoint.jl; sleep 1"]` as this allows both the Julia process and the `preStop` process to cleanly terminate. + +### Read-only Filesystem + +If you have a read-only filesystem on your container you'll need to configure a writeable volume mount for K8sDeputy.jl. The `DEPUTY_IPC_DIR` environmental variable can be used to instruct K8sDeputy.jl where to store the named pipes it creates for interprocess communication: + +```yaml +apiVersion: v1 +kind: Pod +spec: + containers: + - name: app + # command: ["/bin/sh", "-c", "julia entrypoint.jl; sleep 1"] + env: + - name: DEPUTY_IPC_DIR + value: /mnt/deputy-ipc + lifecycle: + preStop: + exec: + command: ["julia", "-e", "using K8sDeputy; graceful_terminate()"] + securityContext: + readOnlyRootFilesystem: true + volumeMounts: + - mountPath: /mnt/deputy-ipc + name: deputy-ipc + volumes: + - name: deputy-ipc + emptyDir: + medium: Memory +``` diff --git a/docs/src/health_checks.md b/docs/src/health_checks.md new file mode 100644 index 0000000..a2228cd --- /dev/null +++ b/docs/src/health_checks.md @@ -0,0 +1,114 @@ +# Health Checks + +K8sDeputy.jl provides the following health endpoints: + +- `/health/live` +- `/health/ready` + +These endpoints respond with HTTP status `200 OK` on success or `503 Service Unavailable` on failure. + +## Supporting liveness probes + +In order to enable [liveness probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command) you will need to start the K8sDeputy health check server from within your application: + +```julia +using K8sDeputy +deputy = Deputy() +K8sDeputy.serve!(deputy, "0.0.0.0") + +# Application code +``` + +!!! note + + We specify the HTTP service to listen to all addresses (i.e. `0.0.0.0`) on the container as the K8s [kubelet](https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet/) which uses the `livenessProbe` executes the requests from outside of the container. + +Once `K8sDeputy.serve!` has been called the HTTP based liveness endpoint should now return successful responses. + +Probe requests prior to running `K8sDeputy.serve!` will return failure responses. Application developers should consider starting the health check endpoints before running slow application initialization code. Alternatively, an [`initialDelaySeconds`](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#configure-probes) can be added to the `livenessProbe`. + +You'll also need to configure your K8s container resource to specify the `livenessProbe`. For example here's a partial manifest for a K8s pod: + +```yaml +apiVersion: v1 +kind: Pod +spec: + containers: + - name: app + ports: + - name: health-check + containerPort: 8081 # The default K8sDeputy.jl heath check port + protocol: TCP + livenessProbe: + httpGet: + path: /health/live + port: health-check + timeoutSeconds: 5 +``` + +!!! note + + K8s probes require that applications must respond to the probe requests in under `timeoutSeconds` (defaults to 1 second). Since Julia's HTTP.jl server can be unresponsive we recommend using a `timeoutSeconds` of at least 5 seconds. + +## Supporting readiness probes + +Enabling [readiness probes](https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-readiness-probes) is similar to [enabling the liveness probes](#supporting-liveness-probes) but requires an call to `readied!`: + +```julia +using K8sDeputy +deputy = Deputy() +K8sDeputy.serve!(deputy, "0.0.0.0") + +# Application initialization code + +readied!(deputy) + +# Application code +``` + +When your application is ready you should declare your application as such with `readied!`. Doing this causes the readiness endpoint to start returning successful responses. For K8s applications responding to network traffic this endpoint is critical for ensuring timely responses to external requests. Although, defining `readied!` for non-network based applications is optional it can still be useful for administration/monitoring. + +To configure your K8s container resource with a readiness probe you'll need to declare a `readinessProbe` in your manifest. For example here's a partial manifest for a K8s pod: + +```yaml +apiVersion: v1 +kind: Pod +spec: + containers: + - name: app + ports: + - name: health-check + containerPort: 8081 # Default K8sDeputy.jl heath check port + protocol: TCP + readinessProbe: + httpGet: + path: /health/ready + port: health-check + timeoutSeconds: 5 +``` + +## Shutdown + +When it is time to shutdown your application you should inform the deputy by running the `shutdown!` function: + +```julia +using K8sDeputy +deputy = Deputy(; shutdown_handler=() -> @info "Shutting down") +K8sDeputy.serve!(deputy, "0.0.0.0") + +try + # Application code +finally + shutdown!(deputy) +end +``` + +Once `shutdown!` is called the following occurs: + +1. The liveness endpoint starts returning failure responses +2. The deputy's `shutdown_handler` is called +3. The Julia process is terminated + +By default the `shutdown_handler` only has 5 seconds to complete. If your `shutdown_handler` requires more time to execute you can change the timeout by using the keyword `shutdown_handler_timeout`. + +Depending on your application you may want to define multiple calls to `shutdown!`. For example you may want to call `shutdown!` from within `graceful_terminator` to enable [graceful termination support](./graceful_termination.md) for you application. diff --git a/docs/src/index.md b/docs/src/index.md new file mode 100644 index 0000000..1552f28 --- /dev/null +++ b/docs/src/index.md @@ -0,0 +1,8 @@ +# K8sDeputy.jl + +[![CI](https://github.com/beacon-biosignals/K8sDeputy.jl/actions/workflows/CI.yml/badge.svg)](https://github.com/beacon-biosignals/K8sDeputy.jl/actions/workflows/CI.yml) +[![Code Style: YASGuide](https://img.shields.io/badge/code%20style-yas-violet.svg)](https://github.com/jrevels/YASGuide) +[![Stable Documentation](https://img.shields.io/badge/docs-stable-blue.svg)](https://beacon-biosignals.github.io/K8sDeputy.jl/stable) +[![Dev Documentation](https://img.shields.io/badge/docs-dev-blue.svg)](https://beacon-biosignals.github.io/K8sDeputy.jl/dev) + +Provides K8s health checks and graceful termination support on behalf of Julia services. diff --git a/docs/src/quickstart.md b/docs/src/quickstart.md new file mode 100644 index 0000000..4704bd2 --- /dev/null +++ b/docs/src/quickstart.md @@ -0,0 +1,64 @@ +# Quickstart + +For users who want to get started quickly you can use the following template to incorporate liveness probes, readiness probes, graceful termination, binding to non-priviledged ports, and read-only filesystem support. + +1. Add K8sDeputy.jl to your Julia project: `Pkg.add("K8sDeputy")` +2. Define the following `entrypoint.jl` in your application and include it in the `WORKDIR` of your `Dockerfile`: + + ```julia + using K8sDeputy + deputy = Deputy() + server = K8sDeputy.serve!(deputy, "0.0.0.0") + graceful_terminator(() -> shutdown!(deputy)) + + # Application initialization code + + readied!(deputy) + + # Application code + ``` + +3. Incorporate the following changes into your K8s resource manifest: + + ```yaml + apiVersion: v1 + kind: Pod + spec: + containers: + - name: app + command: ["/bin/sh", "-c", "julia entrypoint.jl; sleep 1"] + env: + - name: DEPUTY_IPC_DIR + value: /mnt/deputy-ipc + ports: + - name: health-check + containerPort: 8081 # Default K8sDeputy.jl heath check port + protocol: TCP + livenessProbe: + httpGet: + path: /health/live + port: health-check + timeoutSeconds: 5 + readinessProbe: + httpGet: + path: /health/ready + port: health-check + timeoutSeconds: 5 + lifecycle: + preStop: + exec: + command: ["julia", "-e", "using K8sDeputy; graceful_terminate()"] + securityContext: + capabilities: + drop: + - all + readOnlyRootFilesystem: true + volumeMounts: + - mountPath: /mnt/deputy-ipc + name: deputy-ipc + terminationGracePeriodSeconds: 30 + volumes: + - name: deputy-ipc + emptyDir: + medium: Memory + ``` diff --git a/src/K8sDeputy.jl b/src/K8sDeputy.jl new file mode 100644 index 0000000..16daeea --- /dev/null +++ b/src/K8sDeputy.jl @@ -0,0 +1,14 @@ +module K8sDeputy + +using Dates: Period, Second +using HTTP: HTTP +using Mocking +using Sockets: accept, connect, listen, localhost + +export Deputy, graceful_terminator, readied!, shutdown!, graceful_terminate + +include("graceful_termination.jl") +include("health.jl") +include("server.jl") + +end # module K8sDeputy diff --git a/src/graceful_termination.jl b/src/graceful_termination.jl new file mode 100644 index 0000000..f3605aa --- /dev/null +++ b/src/graceful_termination.jl @@ -0,0 +1,162 @@ +# As Julia lacks user-defined signal handling and the default behavior for critical signals +# (i.e. SIGTERM, SIGABRT, SIGQUIT) is to report the signal and show a stack trace. As K8s +# utilizes SIGTERM by default to gracefully shutdown pods and we want to avoid logging +# unnecessary stack traces so we will utilize a `preStop` container hook as an alternative. +# +# Note it is possible to use the C function `sigaction` with a Julia callback function but +# from experimenting with this there are a few issues such as being unable to use locks or +# printing (`jl_safe_printf` does work). + +# Linux typically stores PID files in `/run` which requires root access. For systems with +# read-only file systems we need to support a user specified writable volume. +_deputy_ipc_dir() = get(tempdir, ENV, "DEPUTY_IPC_DIR") + +# Prefer using UNIX domain sockets but if the `DEPUTY_IPC_DIR` is set assume the file +# system is read-only and use a named pipe instead. +function _socket_path(name) + return haskey(ENV, "DEPUTY_IPC_DIR") ? joinpath(_deputy_ipc_dir(), name) : name +end + +# Following the Linux convention for pid files: +# https://refspecs.linuxfoundation.org/FHS_3.0/fhs/ch03s15.html +entrypoint_pid_file() = joinpath(_deputy_ipc_dir(), "julia-entrypoint.pid") +set_entrypoint_pid(pid::Integer) = write(entrypoint_pid_file(), string(pid) * "\n") + +function entrypoint_pid() + pid_file = entrypoint_pid_file() + return isfile(pid_file) ? parse(Int, readchomp(pid_file)) : 1 +end + +# https://docs.libuv.org/en/v1.x/process.html#c.uv_kill +uv_kill(pid::Integer, signum::Integer) = ccall(:uv_kill, Cint, (Cint, Cint), pid, signum) + +""" + graceful_terminator(f; set_entrypoint::Bool=true) -> Nothing + +Register a zero-argument function to be called when `graceful_terminate` is called targeting +this process. The user-defined function `f` is expected to call `exit` to terminate the +Julia process. The `graceful_terminator` function is only allowed to be called once within a +Julia process. + +## Keywords + +- `set_entrypoint::Bool` (optional): Sets the calling Julia process as the "entrypoint" to + be targeted by default when running `graceful_terminate` in another Julia process. Users + who want to utilize `graceful_terminator` in multiple Julia processes should use + `set_entrypoint=false` and specify process IDs when calling `graceful_terminate`. Defaults + to `true`. + +## Examples + +```julia +app_status = AppStatus() +graceful_terminator(() -> shutdown!(app_status)) +``` +## Kubernetes Setup + +When using Kubernetes (K8s) you can enable [graceful termination](https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace) +of a Julia process by defining a pod [`preStop`](https://kubernetes.io/docs/concepts/containers/container-lifecycle-hooks/#container-hooks) +container hook. Typically, K8s initiates graceful termination via the `TERM` signal but +as Julia forcefully terminates when receiving this signal and Julia does not support +user-defined signal handlers we utilize `preStop` instead. + +The following K8s pod manifest snippet will specify K8s to call the user-defined function +specified by the `graceful_terminator`: + +```yaml +spec: + containers: + - lifecycle: + preStop: + exec: + command: ["julia", "-e", "using $(@__MODULE__()); graceful_terminate()"] +``` + +Additionally, the entrypoint for the container should also not directly use the Julia process +as the init process (PID 1). Instead, users should define their entrypoint similarly to +`["/bin/sh", "-c", "julia entrypoint.jl; sleep 1"]` as this allows for both the Julia +process and the `preStop` process to cleanly terminate. +""" +function graceful_terminator(f; set_entrypoint::Bool=true) + set_entrypoint && set_entrypoint_pid(getpid()) + + # Utilize UNIX domain sockets for the IPC. Avoid using network sockets here as we don't + # want to allow access to this functionality from outside of the localhost. Each process + # uses a distinct socket name allowing for multiple Julia processes to allow independent + # use of the graceful terminator. + server = listen(_socket_path("graceful-terminator.$(getpid())")) + + t = Threads.@spawn begin + while isopen(server) + sock = accept(server) + request = readline(sock) + + if request == "terminate" + try + f() # Expecting user-defined function to call `exit` + catch e + @error "User graceful terminator callback failed with exception:\n" * + sprint(showerror, e, catch_backtrace()) + end + else + @warn "Graceful terminator received an invalid request: \"$request\"" + end + + close(sock) + end + end + + # Useful only to report internal errors + @static if VERSION >= v"1.7.0-DEV.727" + errormonitor(t) + end + + return nothing +end + +""" + graceful_terminate(pid::Integer=entrypoint_pid(); wait::Bool=true) -> Nothing + +Initiates the execution of the `graceful_terminator` user callback in the process `pid`. See +`graceful_terminator` for more details. +""" +function graceful_terminate(pid::Integer=entrypoint_pid(); wait::Bool=true) + # Note: The follow dead code has been left here purposefully as an example of how to + # view output when running via `preStop`. + # + # As K8s doesn't provide a way to view the logs from the `preStop` command you can work + # a round this by writing to the STDOUT of the `pid`. Only works while `pid` is running. + # https://stackoverflow.com/a/70708744 + # open("/proc/$pid/fd/1", "w") do io + # println(io, "preStop called") + # end + + sock = connect(_socket_path("graceful-terminator.$pid")) + println(sock, "terminate") + close(sock) + + # Wait for the `pid` to complete. We must block here as otherwise K8s sends a + # `TERM` signal immediately after the `preStop` completes. If we fail to wait the + # Julia process won't have a chance to perform a "clean" shutdown. If the Julia process + # takes longer than `terminationGracePeriodSeconds` to stop then K8s will forcefully + # terminate the with the `KILL` signal. + # + # The `preStop` must complete before the container terminates otherwise K8s will + # report a `FailedPreStopHook` event. To avoid seeing this warning the Julia process + # should not be run directly as the container entrypoint but rather run as a subprocess + # of the entrypoint with a delay after the subprocess' termination. Doing this allows + # both the target Julia process and the `preStop` process to exit cleanly. + # + # https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace + # https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-termination + if wait + # The special "signal" 0 is used to check for process existence. + # https://man7.org/linux/man-pages/man2/kill.2.html + while uv_kill(pid, 0) == 0 + # Polling frequency should ideally be faster than the post-termination delay + sleep(0.1) + end + end + + return nothing +end diff --git a/src/health.jl b/src/health.jl new file mode 100644 index 0000000..cebb3b5 --- /dev/null +++ b/src/health.jl @@ -0,0 +1,100 @@ +mutable struct Deputy + ready::Bool + shutting_down::Bool + shutdown_handler::Any + shutdown_handler_timeout::Second +end + +""" + Deputy(; shutdown_handler=nothing, shutdown_handler_timeout::Period=Second(5)) + +Construct an application `Deputy` which provides health check endpoints. + +## Keywords + +- `shutdown_handler` (optional): A zero-argument function which allows the user to provide + a custom callback function for when `shutdown!(::Deputy)` is called. +- `shutdown_handler_timeout::Period` (optional): Specifies the maximum execution duration of + a `shutdown_handler`. +""" +function Deputy(; shutdown_handler=nothing, shutdown_handler_timeout::Period=Second(5)) + return Deputy(false, false, shutdown_handler, shutdown_handler_timeout) +end + +""" + readied!(deputy::Deputy) -> Nothing + +Mark the application as "ready". Sets the readiness endpoint to respond with successful +responses. +""" +function readied!(deputy::Deputy) + deputy.ready = true + return nothing +end + +""" + shutdown!(deputy::Deputy) -> Nothing + +Initiates a shutdown of the application by: + +1. Mark the application as shutting down ("non-live"). +2. Executing the deputy's `shutdown_handler` (if defined). +3. Exiting the current Julia process. + +If a `deputy.shutdown_handler` is defined it must complete within the +`deputy.shutdown_handler_timeout` or a warning will be logged and the Julia process will +immediately exit. Any exceptions that occur in the `deputy.shutdown_handler` will also be +logged and result in the Julia process exiting. + +A `shutdown_handler` may optionally call `exit` if a user wants to specify the exit status. +By default `shutdown!` uses an exit status of `1`. +""" +function shutdown!(deputy::Deputy) + # Abend if already shutting down + deputy.shutting_down && return nothing + deputy.shutting_down = true + + if !isnothing(deputy.shutdown_handler) + t = @async deputy.shutdown_handler() + + # Ensure the shutdown handler completes on-time and without exceptions + status = timedwait(deputy.shutdown_handler_timeout; pollint=Second(1)) do + return istaskdone(t) + end + + if istaskfailed(t) + @error "Shutdown handler failed" exception = TaskFailedException(t) + elseif status === :timed_out + @warn "Shutdown handler still running after $(deputy.shutdown_handler_timeout)" + end + end + + # Normally `shutdown!` is responsible for exiting the Julia process. However, a + # user-defined `shutdown_handler` may call `exit` but must do so prior before reaching + # the timeout. + @mock exit(1) + + return nothing +end + +function liveness_endpoint(deputy::Deputy) + return function (r::HTTP.Request) + @debug "liveness probed" + return if !deputy.shutting_down + HTTP.Response(200) + else + HTTP.Response(503) + end + end +end + +function readiness_endpoint(deputy::Deputy) + return function (r::HTTP.Request) + @debug "readiness probed" + return if deputy.ready + HTTP.Response(200) + else + HTTP.Response(503) + end + end +end diff --git a/src/server.jl b/src/server.jl new file mode 100644 index 0000000..66b9a2b --- /dev/null +++ b/src/server.jl @@ -0,0 +1,33 @@ +const DEFAULT_PORT = 8081 + +function _default_port() + name = "DEPUTY_HEALTH_CHECK_PORT" + return haskey(ENV, name) ? parse(Int, ENV[name]) : DEFAULT_PORT +end + +""" + K8sDeputy.serve!(deputy::Deputy, [host], [port::Integer]) -> HTTP.Server + +Starts a non-blocking `HTTP.Server` responding to requests to `deputy` health checks. The +following health check endpoints are available: + +- `/health/live`: Is the server is alive/running? +- `/health/ready`: Is the server ready (has `readied!(deputy)` been called)? + +These endpoints will respond with HTTP status `200 OK` on success or +`503 Service Unavailable` on failure. + +## Arguments + +- `host` (optional): The address to listen to for incoming requests. Defaults to + `Sockets.localhost`. +- `port::Integer` (optional): The port to listen on. Defaults to the port number specified + by the environmental variable `DEPUTY_HEALTH_CHECK_PORT`, otherwise `8081`. +""" +function serve!(deputy::Deputy, host=localhost, port::Integer=_default_port()) + router = HTTP.Router() + HTTP.register!(router, "/health/live", liveness_endpoint(deputy)) + HTTP.register!(router, "/health/ready", readiness_endpoint(deputy)) + + return HTTP.serve!(router, host, port) +end diff --git a/test/graceful_termination.jl b/test/graceful_termination.jl new file mode 100644 index 0000000..075ae91 --- /dev/null +++ b/test/graceful_termination.jl @@ -0,0 +1,74 @@ +@testset "graceful_terminator" begin + @testset "Julia entrypoint" begin + code = quote + using K8sDeputy + atexit(() -> @info "SHUTDOWN COMPLETE") + graceful_terminator() do + @info "GRACEFUL TERMINATION HANDLER" + exit(2) + return nothing + end + sleep(60) + end + + cmd = `$(Base.julia_cmd()) --color=no -e $code` + buffer = IOBuffer() + p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false) + @test timedwait(() -> process_running(p), Second(5)) === :ok + + # Allow some time for Julia to startup and the graceful terminator to be registered. + sleep(3) + + # When no PID is passed in the process ID is read from the Julia entrypoint file. + # Blocks untils the process terminates. + @test graceful_terminate() === nothing + + @test process_exited(p) + @test p.exitcode == 2 + + output = String(take!(buffer)) + expected = """ + [ Info: GRACEFUL TERMINATION HANDLER + [ Info: SHUTDOWN COMPLETE + """ + @test output == expected + end + + @testset "multiple Julia processes" begin + code = quote + using K8sDeputy + atexit(() -> @info "SHUTDOWN COMPLETE") + graceful_terminator(; set_entrypoint=false) do + @info "GRACEFUL TERMINATION HANDLER" + exit(2) + return nothing + end + sleep(60) + end + + cmd = `$(Base.julia_cmd()) --color=no -e $code` + buffer1 = IOBuffer() + buffer2 = IOBuffer() + p1 = run(pipeline(cmd; stdout=buffer1, stderr=buffer1); wait=false) + p2 = run(pipeline(cmd; stdout=buffer2, stderr=buffer2); wait=false) + @test timedwait(() -> process_running(p1) && process_running(p2), Second(5)) === :ok + + # Allow some time for Julia to startup and the graceful terminator to be registered. + sleep(3) + + # Blocks untils the process terminates + @test graceful_terminate(getpid(p1)) === nothing + @test graceful_terminate(getpid(p2)) === nothing + @test process_exited(p1) + @test process_exited(p2) + + output1 = String(take!(buffer1)) + output2 = String(take!(buffer2)) + expected = """ + [ Info: GRACEFUL TERMINATION HANDLER + [ Info: SHUTDOWN COMPLETE + """ + @test output1 == expected + @test output2 == expected + end +end diff --git a/test/health.jl b/test/health.jl new file mode 100644 index 0000000..8d45143 --- /dev/null +++ b/test/health.jl @@ -0,0 +1,236 @@ +function exit_patcher(rc::Ref{Int}) + atexit_hooks = [] + return [@patch Base.atexit(f) = push!(atexit_hooks, f) + @patch function Base.exit(n) + rc[] = n + while !isempty(atexit_hooks) + pop!(atexit_hooks)() + end + end] +end + +@testset "Deputy" begin + @testset "basic" begin + deputy = Deputy() + @test !deputy.ready + @test !deputy.shutting_down + + readied!(deputy) + @test deputy.ready + @test !deputy.shutting_down + end + + @testset "liveness_endpoint / readiness_endpoint" begin + deputy = Deputy() + request = HTTP.Request() + + # Note: Users should not mutate the internal state of a `Deputy` + # TODO: Define `==(x::HTTP.Response, y::HTTP.Response)`. + + deputy.ready = false + r = readiness_endpoint(deputy)(request) + @test r.status == 503 + @test isempty(String(r.body)) + + deputy.ready = true + r = readiness_endpoint(deputy)(request) + @test r.status == 200 + @test isempty(String(r.body)) + + deputy.shutting_down = false + r = liveness_endpoint(deputy)(request) + @test r.status == 200 + @test isempty(String(r.body)) + + deputy.shutting_down = true + r = liveness_endpoint(deputy)(request) + @test r.status == 503 + @test isempty(String(r.body)) + end + + # Note: If a non-mocked `exit(0)` is called it may appear that all tests have passed. + @testset "shutdown!" begin + @testset "default handler" begin + deputy = Deputy() + + rc = Ref{Int}() + logs = [(:info, "SHUTDOWN COMPLETE")] + @test_logs(logs..., + apply(exit_patcher(rc)) do + @mock atexit(() -> @info "SHUTDOWN COMPLETE") + return shutdown!(deputy) + end) + + @test isassigned(rc) + @test rc[] == 1 + end + + @testset "custom handler" begin + deputy = nothing + + shutdown_handler = function () + @info "SHUTDOWN HANDLER" + @info "shutting_down = $(deputy.shutting_down)" + end + + deputy = Deputy(; shutdown_handler) + + rc = Ref{Int}() + logs = [(:info, "SHUTDOWN HANDLER"), + (:info, "shutting_down = true"), + (:info, "SHUTDOWN COMPLETE")] + @test_logs(logs..., + apply(exit_patcher(rc)) do + @mock atexit(() -> @info "SHUTDOWN COMPLETE") + return shutdown!(deputy) + end) + + @test isassigned(rc) + @test rc[] == 1 + end + + @testset "handler exception" begin + shutdown_handler = () -> error("failure") + deputy = Deputy(; shutdown_handler) + + rc = Ref{Int}() + logs = [(:error, "Shutdown handler failed"), + (:info, "SHUTDOWN COMPLETE")] + @test_logs(logs..., + apply(exit_patcher(rc)) do + @mock atexit(() -> @info "SHUTDOWN COMPLETE") + return shutdown!(deputy) + end) + + @test isassigned(rc) + @test rc[] == 1 + end + + @testset "timeout" begin + shutdown_handler = function () + @info "SHUTDOWN HANDLER" + sleep(10) + @info "SHOULD NEVER BE SEEN" + return nothing + end + + deputy = Deputy(; shutdown_handler, shutdown_handler_timeout=Second(1)) + + rc = Ref{Int}() + logs = [(:info, "SHUTDOWN HANDLER"), + (:warn, "Shutdown handler still running after 1 second"), + (:info, "SHUTDOWN COMPLETE")] + @test_logs(logs..., + apply(exit_patcher(rc)) do + @mock atexit(() -> @info "SHUTDOWN COMPLETE") + return shutdown!(deputy) + end) + + @test isassigned(rc) + @test rc[] == 1 + end + + @testset "exit" begin + code = quote + using K8sDeputy, Dates + + shutdown_handler() = @info "SHUTDOWN HANDLER" + atexit(() -> @info "SHUTDOWN COMPLETE") + + deputy = Deputy(; shutdown_handler, shutdown_handler_timeout=Second(1)) + shutdown!(deputy) + end + + cmd = `$(Base.julia_cmd()) --color=no -e $code` + buffer = IOBuffer() + p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false) + + @test timedwait(() -> process_exited(p), Second(10)) === :ok + @test p.exitcode == 1 + + output = String(take!(buffer)) + expected = """ + [ Info: SHUTDOWN HANDLER + [ Info: SHUTDOWN COMPLETE + """ + @test output == expected + end + end + + @testset "serve!" begin + deputy = Deputy() + port = rand(EPHEMERAL_PORT_RANGE) + server = serve!(deputy, localhost, port) + + try + r = HTTP.get("http://$localhost:$port/health/ready"; status_exception=false) + @test r.status == 503 + + r = HTTP.get("http://$localhost:$port/health/live") + @test r.status == 200 + + readied!(deputy) + + r = HTTP.get("http://$localhost:$port/health/ready") + @test r.status == 200 + + r = HTTP.get("http://$localhost:$port/health/live") + @test r.status == 200 + + # Faking shutting down. Normal usage would call `shutdown!` but we don't want to + # terminate our test process. + deputy.shutting_down = true + + r = HTTP.get("http://$localhost:$port/health/ready") + @test r.status == 200 + + r = HTTP.get("http://$localhost:$port/health/live"; status_exception=false) + @test r.status == 503 + finally + close(server) + end + end + + @testset "graceful termination" begin + port = rand(EPHEMERAL_PORT_RANGE) + code = quote + using K8sDeputy, Sockets + + shutdown_handler() = @info "SHUTDOWN HANDLER" + atexit(() -> @info "SHUTDOWN COMPLETE") + + deputy = Deputy(; shutdown_handler) + graceful_terminator(; set_entrypoint=false) do + @info "GRACEFUL TERMINATION HANDLER" + shutdown!(deputy) + return nothing + end + K8sDeputy.serve!(deputy, Sockets.localhost, $port) + readied!(deputy) + sleep(60) + end + + cmd = `$(Base.julia_cmd()) --color=no -e $code` + buffer = IOBuffer() + p = run(pipeline(cmd; stdout=buffer, stderr=buffer); wait=false) + @test timedwait(() -> process_running(p), Second(5)) === :ok + @test timedwait(Second(10)) do + r = HTTP.get("http://$localhost:$port/health/ready"; status_exception=false) + return r.status == 200 + end === :ok + + # Blocks untils the process terminates + graceful_terminate(getpid(p)) + @test process_exited(p) + @test p.exitcode == 1 + + output = String(take!(buffer)) + expected = """ + [ Info: Listening on: $localhost:$port, thread id: 1 + [ Info: GRACEFUL TERMINATION HANDLER + [ Info: SHUTDOWN HANDLER + [ Info: SHUTDOWN COMPLETE + """ + @test output == expected + end +end diff --git a/test/runtests.jl b/test/runtests.jl new file mode 100644 index 0000000..af18fe9 --- /dev/null +++ b/test/runtests.jl @@ -0,0 +1,22 @@ +using Aqua: Aqua +using Dates: Second +using K8sDeputy +using K8sDeputy: readiness_endpoint, liveness_endpoint, serve! +using HTTP: HTTP +using Mocking: Mocking, @mock, @patch, apply +using Sockets: localhost +using Test + +# https://en.wikipedia.org/wiki/Ephemeral_port +const EPHEMERAL_PORT_RANGE = 49152:65535 + +Mocking.activate() + +@testset "K8sDeputy.jl" begin + @testset "Aqua" begin + Aqua.test_all(K8sDeputy; ambiguities=false) + end + + include("graceful_termination.jl") + include("health.jl") +end