Skip to content

Commit

Permalink
fix: JSON formatted logs for all Coordinator and marble-injector mess…
Browse files Browse the repository at this point in the history
…ages (#764)

* Log Coordinator's http internal errors to zap logger
* Use structured logging in marble-injector
* Refactor marble-injector test

---------

Signed-off-by: Daniel Weiße <[email protected]>
  • Loading branch information
daniel-weisse authored Nov 18, 2024
1 parent 1a39a52 commit 47b1793
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 324 deletions.
8 changes: 5 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ add_custom_target(sign-coordinator ALL DEPENDS coordinator-enclave.signed coordi
#

add_custom_target(marble-injector ALL
COMMAND
CGO_ENABLED=0
go build ${TRIMPATH}
-o ${CMAKE_BINARY_DIR}
-buildvcs=false
${CMAKE_COMMAND} -P ${CMAKE_SOURCE_DIR}/build_with_version.cmake
"go" "${PROJECT_VERSION}" "${CMAKE_BINARY_DIR}/marble-injector"
"main"
${TRIMPATH}
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/cmd/marble-injector
)

Expand Down
9 changes: 8 additions & 1 deletion cmd/coordinator/enclavemain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,24 @@ SPDX-License-Identifier: BUSL-1.1
package main

import (
"fmt"
"os"
"path/filepath"

"github.com/edgelesssys/marblerun/coordinator/constants"
"github.com/edgelesssys/marblerun/coordinator/quote/ertvalidator"
"github.com/edgelesssys/marblerun/coordinator/recovery"
"github.com/edgelesssys/marblerun/coordinator/seal"
"github.com/edgelesssys/marblerun/internal/logging"
"github.com/edgelesssys/marblerun/util"
)

func main() {
log := newLogger()
log, err := logging.New()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create logger: %s\n", err)
os.Exit(1)
}
validator := ertvalidator.NewERTValidator(log)
issuer := ertvalidator.NewERTIssuer()
sealDirPrefix := filepath.Join(filepath.FromSlash("/edg"), "hostfs")
Expand Down
10 changes: 9 additions & 1 deletion cmd/coordinator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@ SPDX-License-Identifier: BUSL-1.1
package main

import (
"fmt"
"os"

"github.com/edgelesssys/marblerun/coordinator/constants"
"github.com/edgelesssys/marblerun/coordinator/quote"
"github.com/edgelesssys/marblerun/coordinator/recovery"
"github.com/edgelesssys/marblerun/coordinator/seal"
"github.com/edgelesssys/marblerun/internal/logging"
"github.com/edgelesssys/marblerun/util"
)

func main() {
log := newLogger()
log, err := logging.New()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create logger: %s\n", err)
os.Exit(1)
}
validator := quote.NewFailValidator()
issuer := quote.NewFailIssuer()
sealDir := util.Getenv(constants.SealDir, constants.SealDirDefault())
Expand Down
17 changes: 0 additions & 17 deletions cmd/coordinator/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package main

import (
"context"
"fmt"
"os"
"strings"

Expand Down Expand Up @@ -128,19 +127,3 @@ func run(log *zap.Logger, validator quote.Validator, issuer quote.Issuer, sealDi
func isDevMode() bool {
return util.Getenv(constants.DevMode, constants.DevModeDefault) == "1"
}

func newLogger() *zap.Logger {
var cfg zap.Config
if isDevMode() {
cfg = zap.NewDevelopmentConfig()
} else {
cfg = zap.NewProductionConfig()
cfg.DisableStacktrace = true // Disable stacktraces in production
}
log, err := cfg.Build()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create logger: %s\n", err)
os.Exit(1)
}
return log
}
30 changes: 22 additions & 8 deletions cmd/marble-injector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,20 @@ import (
"crypto/tls"
"flag"
"fmt"
"log"
"net/http"
"os"

"github.com/edgelesssys/marblerun/injector"
"github.com/edgelesssys/marblerun/internal/logging"
"go.uber.org/zap"
)

// Version of the injector.
var Version = "0.0.0" // Don't touch! Automatically injected at build-time.

// GitCommit is the git commit hash.
var GitCommit = "0000000000000000000000000000000000000000" // Don't touch! Automatically injected at build-time.

func main() {
var certFile string
var keyFile string
Expand All @@ -30,12 +38,16 @@ func main() {

flag.Parse()

mux := http.NewServeMux()
w := &injector.Mutator{
CoordAddr: addr,
DomainName: clusterDomain,
SGXResource: sgxResource,
log, err := logging.New()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to create logger: %s\n", err)
os.Exit(1)
}
defer log.Sync() // flushes buffer, if any
log.Info("Starting marble-injector webhook", zap.String("version", Version), zap.String("commit", GitCommit))

mux := http.NewServeMux()
w := injector.New(addr, clusterDomain, sgxResource, log)

mux.HandleFunc("/mutate", w.HandleMutate)

Expand All @@ -46,10 +58,12 @@ func main() {
TLSConfig: &tls.Config{
GetCertificate: loadWebhookCert(certFile, keyFile),
},
ErrorLog: logging.NewWrapper(log),
}

log.Println("Starting Server")
log.Fatal(s.ListenAndServeTLS("", ""))
log.Info("Starting Server")
err = s.ListenAndServeTLS("", "")
log.Fatal("Failed running server", zap.Error(err))
}

// loadWebhookCert loads the certificate and key file for the webhook server.
Expand Down
2 changes: 2 additions & 0 deletions coordinator/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/edgelesssys/marblerun/coordinator/server/handler"
v1 "github.com/edgelesssys/marblerun/coordinator/server/v1"
v2 "github.com/edgelesssys/marblerun/coordinator/server/v2"
mrlogging "github.com/edgelesssys/marblerun/internal/logging"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -115,6 +116,7 @@ func RunClientServer(mux http.Handler, address string, tlsConfig *tls.Config, za
Addr: address,
Handler: mux,
TLSConfig: tlsConfig,
ErrorLog: mrlogging.NewWrapper(zapLogger),
}
zapLogger.Info("Starting client https server", zap.String("address", address))
err := server.ListenAndServeTLS("", "")
Expand Down
77 changes: 44 additions & 33 deletions injector/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"strings"

"github.com/edgelesssys/marblerun/util/k8sutil"
"go.uber.org/zap"
v1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -35,51 +35,62 @@ const (

// Mutator struct.
type Mutator struct {
// CoordAddr contains the address of the MarbleRun coordinator
CoordAddr string
DomainName string
SGXResource string
// coordAddr contains the address of the MarbleRun coordinator
coordAddr string
domainName string
sgxResource string
log *zap.Logger
}

// New creates a new Mutator.
func New(coordAddr, domainName, sgxResource string, log *zap.Logger) *Mutator {
return &Mutator{
coordAddr: coordAddr,
domainName: domainName,
sgxResource: sgxResource,
log: log,
}
}

// HandleMutate handles mutate requests and injects sgx tolerations into the request.
func (m *Mutator) HandleMutate(w http.ResponseWriter, r *http.Request) {
log.Println("Handling mutate request, injecting sgx tolerations")
m.log.Info("Handling mutate request, injecting sgx tolerations")
body := checkRequest(w, r)
if body == nil {
// Error was already written to w
return
}

// mutate the request and add sgx tolerations to pod
mutatedBody, err := mutate(body, m.CoordAddr, m.DomainName, m.SGXResource)
mutatedBody, err := m.mutate(body)
if err != nil {
http.Error(w, fmt.Sprintf("unable to mutate request: %v", err), http.StatusInternalServerError)
http.Error(w, fmt.Sprintf("unable to mutate request: %s", err), http.StatusInternalServerError)
return
}

if _, err := w.Write(mutatedBody); err != nil {
http.Error(w, fmt.Sprintf("unable to write response: %v", err), http.StatusInternalServerError)
http.Error(w, fmt.Sprintf("unable to write response: %s", err), http.StatusInternalServerError)
return
}
}

// mutate handles the creation of json patches for pods.
func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, error) {
func (m *Mutator) mutate(body []byte) ([]byte, error) {
admReviewReq := v1.AdmissionReview{}
if err := json.Unmarshal(body, &admReviewReq); err != nil {
log.Println("Unable to mutate request: invalid admission review")
return nil, fmt.Errorf("invalid admission review: %v", err)
m.log.Error("Unable to mutate request: invalid admission review", zap.Error(err))
return nil, fmt.Errorf("invalid admission review: %w", err)
}

if admReviewReq.Request == nil {
log.Println("Unable to mutate request: empty admission review request")
m.log.Error("Unable to mutate request: empty admission review request")
return nil, errors.New("empty admission request")
}

var pod corev1.Pod
if err := json.Unmarshal(admReviewReq.Request.Object.Raw, &pod); err != nil {
log.Println("Unable to mutate request: invalid pod")
return nil, fmt.Errorf("invalid pod: %v", err)
m.log.Error("Unable to mutate request: invalid pod", zap.Error(err))
return nil, fmt.Errorf("invalid pod: %w", err)
}

// admission response
Expand All @@ -101,11 +112,11 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
marbleType, exists := pod.Labels[labelMarbleType]
if !exists {
// admission request was sent for a pod without marblerun/marbletype label, this should not happen
return generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Error: missing [%s] label, request denied", labelMarbleType))
return m.generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Missing [%s] label, request denied", labelMarbleType))
}
if len(marbleType) <= 0 {
// deny request if the label exists, but is empty
return generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Error: empty [%s] label, request denied", labelMarbleType))
return m.generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Empty [%s] label, request denied", labelMarbleType))
}

injectSgx := false
Expand All @@ -122,15 +133,15 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
newEnvVars := []corev1.EnvVar{
{
Name: envMarbleCoordinatorAddr,
Value: coordAddr,
Value: m.coordAddr,
},
{
Name: envMarbleType,
Value: marbleType,
},
{
Name: envMarbleDNSName,
Value: strings.ToLower(fmt.Sprintf("%s,%s.%s,%s.%s.svc.%s", marbleType, marbleType, namespace, marbleType, namespace, domainName)),
Value: strings.ToLower(fmt.Sprintf("%s,%s.%s,%s.%s.svc.%s", marbleType, marbleType, namespace, marbleType, namespace, m.domainName)),
},
}

Expand Down Expand Up @@ -185,19 +196,19 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
if container.Resources.Limits == nil {
container.Resources.Limits = make(map[corev1.ResourceName]resource.Quantity)
}
switch resourceKey {
switch m.sgxResource {
case k8sutil.IntelEpc.String():
// Intels device plugin offers 3 resources:
// epc : sets EPC for the container
// enclave : provides a handle to /dev/sgx_enclave
// provision : provides a handle to /dev/sgx_provision, this is not needed when the Marble utilises out-of-process quote-generation
setResourceLimit(container.Resources.Limits, k8sutil.IntelEpc, k8sutil.GetEPCResourceLimit(resourceKey))
setResourceLimit(container.Resources.Limits, k8sutil.IntelEpc, k8sutil.GetEPCResourceLimit(m.sgxResource))
setResourceLimit(container.Resources.Limits, k8sutil.IntelEnclave, "1")
setResourceLimit(container.Resources.Limits, k8sutil.IntelProvision, "1")
default:
// Azure and Alibaba Cloud plugins offer only 1 resource
// for custom plugins we can only inject the resource provided by the `resourceKey`
setResourceLimit(container.Resources.Limits, corev1.ResourceName(resourceKey), k8sutil.GetEPCResourceLimit(resourceKey))
setResourceLimit(container.Resources.Limits, corev1.ResourceName(m.sgxResource), k8sutil.GetEPCResourceLimit(m.sgxResource))
}
}

Expand Down Expand Up @@ -227,13 +238,13 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
// inject sgx tolerations if enabled
if injectSgx {
pod.Spec.Tolerations = append(pod.Spec.Tolerations, corev1.Toleration{
Key: resourceKey,
Key: m.sgxResource,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
})
}

return generateResponse(pod, admReviewReq, admReviewResponse, true, fmt.Sprintf("Mutation request for pod of marble type [%s] successful", marbleType))
return m.generateResponse(pod, admReviewReq, admReviewResponse, true, fmt.Sprintf("Mutation request for pod of marble type [%s] successful", marbleType))
}

// checkRequest verifies the request used was POST and not empty.
Expand All @@ -251,7 +262,7 @@ func checkRequest(w http.ResponseWriter, r *http.Request) []byte {
body, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
http.Error(w, fmt.Sprintf("unable to read request: %v", err), http.StatusBadRequest)
http.Error(w, fmt.Sprintf("unable to read request: %s", err), http.StatusBadRequest)
return nil
}

Expand Down Expand Up @@ -279,16 +290,16 @@ func setResourceLimit(target map[corev1.ResourceName]resource.Quantity, key core
}

// generateResponse creates the admission response.
func generateResponse(pod corev1.Pod, request, response v1.AdmissionReview, allowed bool, message string) ([]byte, error) {
func (m *Mutator) generateResponse(pod corev1.Pod, request, response v1.AdmissionReview, allowed bool, message string) ([]byte, error) {
marshaledPod, err := json.Marshal(pod)
if err != nil {
log.Println("Error: unable to marshal patched pod")
return nil, fmt.Errorf("unable to marshal patched pod: %v", err)
m.log.Error("Unable to marshal patched pod", zap.Error(err))
return nil, fmt.Errorf("unable to marshal patched pod: %w", err)
}
resp := admission.PatchResponseFromRaw(request.Request.Object.Raw, marshaledPod)
if err := resp.Complete(admission.Request{AdmissionRequest: *request.Request}); err != nil {
log.Println("Error: patching failed")
return nil, fmt.Errorf("patching failed: %v", err)
m.log.Error("Patching failed", zap.Error(err))
return nil, fmt.Errorf("patching failed: %w", err)
}

response.Response = &resp.AdmissionResponse
Expand All @@ -306,11 +317,11 @@ func generateResponse(pod corev1.Pod, request, response v1.AdmissionReview, allo

bytes, err := json.Marshal(response)
if err != nil {
log.Println("Error: unable to marshal admission response")
return nil, fmt.Errorf("unable to marshal admission response: %v", err)
m.log.Error("Unable to marshal admission response", zap.Error(err))
return nil, fmt.Errorf("unable to marshal admission response: %w", err)
}

log.Println(message)
m.log.Info(message)

return bytes, nil
}
Loading

0 comments on commit 47b1793

Please sign in to comment.