diff --git a/Dockerfile b/Dockerfile index ad7274f..107e962 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build stage -FROM golang:1.24-alpine AS build +FROM golang:1.25-alpine AS build # Install git and build dependencies RUN apk add --no-cache git diff --git a/cmd/bench/bench_test.go b/cmd/bench/bench_test.go index 4b1c3d2..a10cfeb 100644 --- a/cmd/bench/bench_test.go +++ b/cmd/bench/bench_test.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "io" + "log/slog" "math/rand" "net" "net/http" @@ -93,6 +94,7 @@ var ( func TestMain(m *testing.M) { flag.Parse() + slog.SetDefault(slog.New(slog.NewTextHandler(io.Discard, nil))) nodes := make([]cacheNode, *numNodes) nodes[0] = startNode("node0", "") diff --git a/cmd/server/main.go b/cmd/server/main.go index d23e8a4..605fecf 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -75,7 +75,7 @@ func main() { mux.HandleFunc("/kv/", func(w http.ResponseWriter, req *http.Request) { op := methodToOp(req.Method) // "get" | "put" | "post" | "delete" | "other" telemetry.Instrument(op, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - slog.Info("Received HTTP Request") + slog.Info("Received HTTP KV Request", "type", r.Method) switch r.Method { case http.MethodPut, http.MethodPost: n.Put(w, r) @@ -88,6 +88,17 @@ func main() { } })).ServeHTTP(w, req) }) + mux.HandleFunc("/replica/", func(w http.ResponseWriter, req *http.Request) { + slog.Info("Received HTTP Replica Request", "type", req.Method) + switch req.Method { + case http.MethodPut, http.MethodPost: + n.PutReplica(w, req) + case http.MethodDelete: + n.DelReplica(w, req) + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } + }) addr = ":8080" slog.Info("ZephyrCache node listening on", "addr", addr) diff --git a/deploy/docker-compose.gossip.yml b/deploy/docker-compose.gossip.yml index 47113a5..90103da 100644 --- a/deploy/docker-compose.gossip.yml +++ b/deploy/docker-compose.gossip.yml @@ -6,7 +6,9 @@ services: dockerfile: Dockerfile environment: - DISCOVERY=gossip - - LOG_LEVEL=warn + - SELF_ID=seed + - SELF_ADDR=seed + - LOG_LEVEL=info ports: - "8080:8080" deploy: diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml index 48067ef..6b5951a 100644 --- a/deploy/docker-compose.yml +++ b/deploy/docker-compose.yml @@ -5,7 +5,7 @@ services: context: .. dockerfile: Dockerfile environment: - - LOG_LEVEL=warn + - LOG_LEVEL=info ports: - "8080-8179:8080" deploy: diff --git a/go.mod b/go.mod index 10becd4..814411e 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/ryandielhenn/zephyrcache -go 1.23.0 +go 1.25.0 require ( github.com/prometheus/client_golang v1.19.0 @@ -22,9 +22,9 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.6.4 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/net v0.38.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/net v0.52.0 // indirect + golang.org/x/sys v0.42.0 // indirect + golang.org/x/text v0.35.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect google.golang.org/grpc v1.71.1 // indirect diff --git a/go.sum b/go.sum index 5cdd596..0f74307 100644 --- a/go.sum +++ b/go.sum @@ -72,20 +72,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= +golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= +golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= +golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= diff --git a/pkg/node/etcd_handlers.go b/pkg/node/etcd_handlers.go index cc334e8..58df4c7 100644 --- a/pkg/node/etcd_handlers.go +++ b/pkg/node/etcd_handlers.go @@ -46,7 +46,7 @@ func WatchPeers(node *Node, cli *clientv3.Client) { normalizedPeers[id] = NormalizeHostPort(addr, "8080") } node.syncPeers(normalizedPeers) - slog.Info("[WatchPeers Callback] synced %d peers\n", len(peers)) + slog.Info("[WatchPeers Callback] synced", "peers", len(peers)) }) slog.Info("[BOOT] after WatchPeers") } diff --git a/pkg/node/http_handlers.go b/pkg/node/http_handlers.go index 3abdc2d..3d7d733 100644 --- a/pkg/node/http_handlers.go +++ b/pkg/node/http_handlers.go @@ -1,12 +1,15 @@ package node import ( + "bytes" "encoding/json" + "fmt" "io" "log/slog" "net/http" "os" "strconv" + "sync" "time" ) @@ -72,38 +75,100 @@ func (s *Node) Forward(w http.ResponseWriter, req *http.Request, owner string) { } -// put adds a key/value pair +func readBody(req *http.Request) ([]byte, error) { + b, err := io.ReadAll(req.Body) + if err != nil && err.Error() != "EOF" { + return nil, err + } + return b, nil +} + +func parseTTL(req *http.Request) (time.Duration, error) { + ttlStr := req.URL.Query().Get("ttl") + if ttlStr == "" { + return 0, nil + } + sec, err := strconv.Atoi(ttlStr) + if err != nil { + return 0, fmt.Errorf("invalid ttl") + } + return time.Duration(sec) * time.Second, nil +} + +// put adds a key/value pair, writing to all replicas. func (n *Node) Put(w http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/kv/"):] - owner, self, ok := n.OwnerForKey(key) - if !ok { - http.Error(w, "no owner for key", http.StatusServiceUnavailable) + + body, err := readBody(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - if owner != self { - slog.Info("[Forward PUT]", "key", key, "owner", owner, "self", self) - n.Forward(w, req, owner) + ttl, err := parseTTL(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - // handle local case - val, err := io.ReadAll(req.Body) - if err != nil && err.Error() != "EOF" { + replicaAddrs := n.ReplicasForKey(key, 3) + if len(replicaAddrs) == 0 { + http.Error(w, "no owner or replicas for key", http.StatusServiceUnavailable) + return + } + + selfAddr := NormalizeHostPort(n.addr, "8080") + var wg sync.WaitGroup + for _, repAddr := range replicaAddrs { + wg.Add(1) + go func(repAddr string) { + defer wg.Done() + if repAddr == selfAddr { + slog.Info("[Writing Replica]", "url", req.URL.Path, "self addr", selfAddr) + n.kv.Put(key, body, ttl) + } else { + slog.Info("[Forward PUT]", "key", key, "replica", repAddr, "self", selfAddr) + replicaURL := "http://" + repAddr + "/replica/" + key + if q := req.URL.RawQuery; q != "" { + replicaURL += "?" + q + } + repReq, err := http.NewRequestWithContext(req.Context(), http.MethodPut, replicaURL, bytes.NewReader(body)) + if err != nil { + slog.Warn("error building replication request", "err", err) + return + } + resp, err := http.DefaultClient.Do(repReq) + if err != nil { + slog.Warn("error forwarding to replica", "err", err, "replica", repAddr) + return + } + resp.Body.Close() + } + }(repAddr) + } + + wg.Wait() + w.WriteHeader(http.StatusNoContent) +} + +// putReplica writes the key/value pair to the local store (called by the primary). +func (n *Node) PutReplica(w http.ResponseWriter, req *http.Request) { + key := req.URL.Path[len("/replica/"):] + slog.Info("[Writing Replica]", "url", req.URL.Path, "self id", n.addr) + + body, err := readBody(req) + if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - var ttl time.Duration - if ttlStr := req.URL.Query().Get("ttl"); ttlStr != "" { - sec, err := strconv.Atoi(ttlStr) - if err != nil { - http.Error(w, "invalid ttl", http.StatusBadRequest) - return - } - ttl = time.Duration(sec) * time.Second + ttl, err := parseTTL(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return } - n.kv.Put(key, val, ttl) + + n.kv.Put(key, body, ttl) w.WriteHeader(http.StatusNoContent) } @@ -132,22 +197,56 @@ func (n *Node) Get(w http.ResponseWriter, req *http.Request) { w.Write(val) } +func (n *Node) DelReplica(w http.ResponseWriter, req *http.Request) { + key := req.URL.Path[len("/replica/"):] + slog.Info("[Deleting Replica]", "url", req.URL.Path, "self id", n.addr) + n.kv.Delete(key) + w.WriteHeader(http.StatusNoContent) +} + // del removes a key func (n *Node) Del(w http.ResponseWriter, req *http.Request) { key := req.URL.Path[len("/kv/"):] - owner, self, ok := n.OwnerForKey(key) - if !ok { - http.Error(w, "no owner for key", http.StatusServiceUnavailable) + replicaAddrs := n.ReplicasForKey(key, 3) + if len(replicaAddrs) == 0 { + http.Error(w, "no owner or replicas for key", http.StatusServiceUnavailable) return } - if owner != self { - slog.Info("[Forward DEL]", "key", key, "owner", owner, "self", self) - n.Forward(w, req, owner) + body, err := readBody(req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - // handle local case - n.kv.Delete(key) + selfAddr := NormalizeHostPort(n.addr, "8080") + var wg sync.WaitGroup + for _, addr := range replicaAddrs { + wg.Add(1) + go func(repAddr string) { + defer wg.Done() + if repAddr == selfAddr { + // handle local case + slog.Info("[Deleting Replica]", "url", req.URL.Path, "self addr", selfAddr) + n.kv.Delete(key) + } else { + slog.Info("[Forward DEL]", "key", key, "replica", repAddr, "self", selfAddr) + replicaURL := "http://" + repAddr + "/replica/" + key + + repReq, err := http.NewRequestWithContext(req.Context(), http.MethodDelete, replicaURL, bytes.NewReader(body)) + if err != nil { + slog.Warn("error building delete replica request", "err", err) + return + } + resp, err := http.DefaultClient.Do(repReq) + if err != nil { + slog.Warn("error forwarding delete to replica", "err", err, "replica", repAddr) + return + } + resp.Body.Close() + } + }(addr) + } + wg.Wait() w.WriteHeader(http.StatusNoContent) } diff --git a/pkg/node/utils.go b/pkg/node/utils.go index 2bd3458..2ead879 100644 --- a/pkg/node/utils.go +++ b/pkg/node/utils.go @@ -38,3 +38,19 @@ func (s *Node) OwnerForKey(key string) (ownerHP, selfHP string, ok bool) { } return NormalizeHostPort(ownerAddr, "8080"), NormalizeHostPort(s.addr, "8080"), true } + +// replicas looks up the replicas for a key and normalizes their addresses +func (n *Node) ReplicasForKey(key string, replicas int) (replicaAddrs []string) { + replicaIds := n.ring.LookupN([]byte(key), replicas) // e.g. "Node3" + + addrs := make([]string, len(replicaIds)) + for i := range len(replicaIds) { + addr, ok := n.ring.Addr(replicaIds[i]) // e.g. "Node3:8080" (what you stored) + if !ok || addr == "" { + return nil + } + addrs[i] = NormalizeHostPort(addr, "8080") + + } + return addrs +}