Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build stage
FROM golang:1.24-alpine AS build
FROM golang:1.25-alpine AS build

# Install git and build dependencies
RUN apk add --no-cache git
Expand Down
2 changes: 2 additions & 0 deletions cmd/bench/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"io"
"log/slog"
"math/rand"
"net"
"net/http"
Expand Down Expand Up @@ -93,6 +94,7 @@ var (

func TestMain(m *testing.M) {
flag.Parse()
slog.SetDefault(slog.New(slog.NewTextHandler(io.Discard, nil)))

nodes := make([]cacheNode, *numNodes)
nodes[0] = startNode("node0", "")
Expand Down
13 changes: 12 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func main() {
mux.HandleFunc("/kv/", func(w http.ResponseWriter, req *http.Request) {
op := methodToOp(req.Method) // "get" | "put" | "post" | "delete" | "other"
telemetry.Instrument(op, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
slog.Info("Received HTTP Request")
slog.Info("Received HTTP KV Request", "type", r.Method)
switch r.Method {
case http.MethodPut, http.MethodPost:
n.Put(w, r)
Expand All @@ -88,6 +88,17 @@ func main() {
}
})).ServeHTTP(w, req)
})
mux.HandleFunc("/replica/", func(w http.ResponseWriter, req *http.Request) {
slog.Info("Received HTTP Replica Request", "type", req.Method)
switch req.Method {
case http.MethodPut, http.MethodPost:
n.PutReplica(w, req)
case http.MethodDelete:
n.DelReplica(w, req)
default:
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
}
})

addr = ":8080"
slog.Info("ZephyrCache node listening on", "addr", addr)
Expand Down
4 changes: 3 additions & 1 deletion deploy/docker-compose.gossip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ services:
dockerfile: Dockerfile
environment:
- DISCOVERY=gossip
- LOG_LEVEL=warn
- SELF_ID=seed
- SELF_ADDR=seed
- LOG_LEVEL=info
ports:
- "8080:8080"
deploy:
Expand Down
2 changes: 1 addition & 1 deletion deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ services:
context: ..
dockerfile: Dockerfile
environment:
- LOG_LEVEL=warn
- LOG_LEVEL=info
ports:
- "8080-8179:8080"
deploy:
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/ryandielhenn/zephyrcache

go 1.23.0
go 1.25.0

require (
github.com/prometheus/client_golang v1.19.0
Expand All @@ -22,9 +22,9 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.6.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.35.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
google.golang.org/grpc v1.71.1 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,20 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/etcd_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func WatchPeers(node *Node, cli *clientv3.Client) {
normalizedPeers[id] = NormalizeHostPort(addr, "8080")
}
node.syncPeers(normalizedPeers)
slog.Info("[WatchPeers Callback] synced %d peers\n", len(peers))
slog.Info("[WatchPeers Callback] synced", "peers", len(peers))
})
slog.Info("[BOOT] after WatchPeers")
}
153 changes: 126 additions & 27 deletions pkg/node/http_handlers.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package node

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strconv"
"sync"
"time"
)

Expand Down Expand Up @@ -72,38 +75,100 @@ func (s *Node) Forward(w http.ResponseWriter, req *http.Request, owner string) {

}

// put adds a key/value pair
func readBody(req *http.Request) ([]byte, error) {
b, err := io.ReadAll(req.Body)
if err != nil && err.Error() != "EOF" {
return nil, err
}
return b, nil
}

func parseTTL(req *http.Request) (time.Duration, error) {
ttlStr := req.URL.Query().Get("ttl")
if ttlStr == "" {
return 0, nil
}
sec, err := strconv.Atoi(ttlStr)
if err != nil {
return 0, fmt.Errorf("invalid ttl")
}
return time.Duration(sec) * time.Second, nil
}

// put adds a key/value pair, writing to all replicas.
func (n *Node) Put(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/kv/"):]
owner, self, ok := n.OwnerForKey(key)
if !ok {
http.Error(w, "no owner for key", http.StatusServiceUnavailable)

body, err := readBody(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if owner != self {
slog.Info("[Forward PUT]", "key", key, "owner", owner, "self", self)
n.Forward(w, req, owner)
ttl, err := parseTTL(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// handle local case
val, err := io.ReadAll(req.Body)
if err != nil && err.Error() != "EOF" {
replicaAddrs := n.ReplicasForKey(key, 3)
if len(replicaAddrs) == 0 {
http.Error(w, "no owner or replicas for key", http.StatusServiceUnavailable)
return
}

selfAddr := NormalizeHostPort(n.addr, "8080")
var wg sync.WaitGroup
for _, repAddr := range replicaAddrs {
wg.Add(1)
go func(repAddr string) {
defer wg.Done()
if repAddr == selfAddr {
slog.Info("[Writing Replica]", "url", req.URL.Path, "self addr", selfAddr)
n.kv.Put(key, body, ttl)
} else {
slog.Info("[Forward PUT]", "key", key, "replica", repAddr, "self", selfAddr)
replicaURL := "http://" + repAddr + "/replica/" + key
if q := req.URL.RawQuery; q != "" {
replicaURL += "?" + q
}
repReq, err := http.NewRequestWithContext(req.Context(), http.MethodPut, replicaURL, bytes.NewReader(body))
if err != nil {
slog.Warn("error building replication request", "err", err)
return
}
resp, err := http.DefaultClient.Do(repReq)
if err != nil {
slog.Warn("error forwarding to replica", "err", err, "replica", repAddr)
return
}
resp.Body.Close()
}
}(repAddr)
}

wg.Wait()
w.WriteHeader(http.StatusNoContent)
}

// putReplica writes the key/value pair to the local store (called by the primary).
func (n *Node) PutReplica(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/replica/"):]
slog.Info("[Writing Replica]", "url", req.URL.Path, "self id", n.addr)

body, err := readBody(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var ttl time.Duration
if ttlStr := req.URL.Query().Get("ttl"); ttlStr != "" {
sec, err := strconv.Atoi(ttlStr)
if err != nil {
http.Error(w, "invalid ttl", http.StatusBadRequest)
return
}
ttl = time.Duration(sec) * time.Second
ttl, err := parseTTL(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
n.kv.Put(key, val, ttl)

n.kv.Put(key, body, ttl)
w.WriteHeader(http.StatusNoContent)
}

Expand Down Expand Up @@ -132,22 +197,56 @@ func (n *Node) Get(w http.ResponseWriter, req *http.Request) {
w.Write(val)
}

func (n *Node) DelReplica(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/replica/"):]
slog.Info("[Deleting Replica]", "url", req.URL.Path, "self id", n.addr)
n.kv.Delete(key)
w.WriteHeader(http.StatusNoContent)
}

// del removes a key
func (n *Node) Del(w http.ResponseWriter, req *http.Request) {
key := req.URL.Path[len("/kv/"):]
owner, self, ok := n.OwnerForKey(key)
if !ok {
http.Error(w, "no owner for key", http.StatusServiceUnavailable)
replicaAddrs := n.ReplicasForKey(key, 3)
if len(replicaAddrs) == 0 {
http.Error(w, "no owner or replicas for key", http.StatusServiceUnavailable)
return
}

if owner != self {
slog.Info("[Forward DEL]", "key", key, "owner", owner, "self", self)
n.Forward(w, req, owner)
body, err := readBody(req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// handle local case
n.kv.Delete(key)
selfAddr := NormalizeHostPort(n.addr, "8080")
var wg sync.WaitGroup
for _, addr := range replicaAddrs {
wg.Add(1)
go func(repAddr string) {
defer wg.Done()
if repAddr == selfAddr {
// handle local case
slog.Info("[Deleting Replica]", "url", req.URL.Path, "self addr", selfAddr)
n.kv.Delete(key)
} else {
slog.Info("[Forward DEL]", "key", key, "replica", repAddr, "self", selfAddr)
replicaURL := "http://" + repAddr + "/replica/" + key

repReq, err := http.NewRequestWithContext(req.Context(), http.MethodDelete, replicaURL, bytes.NewReader(body))
if err != nil {
slog.Warn("error building delete replica request", "err", err)
return
}
resp, err := http.DefaultClient.Do(repReq)
if err != nil {
slog.Warn("error forwarding delete to replica", "err", err, "replica", repAddr)
return
}
resp.Body.Close()
}
}(addr)
}
wg.Wait()
w.WriteHeader(http.StatusNoContent)
}
16 changes: 16 additions & 0 deletions pkg/node/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,19 @@ func (s *Node) OwnerForKey(key string) (ownerHP, selfHP string, ok bool) {
}
return NormalizeHostPort(ownerAddr, "8080"), NormalizeHostPort(s.addr, "8080"), true
}

// replicas looks up the replicas for a key and normalizes their addresses
func (n *Node) ReplicasForKey(key string, replicas int) (replicaAddrs []string) {
replicaIds := n.ring.LookupN([]byte(key), replicas) // e.g. "Node3"

addrs := make([]string, len(replicaIds))
for i := range len(replicaIds) {
addr, ok := n.ring.Addr(replicaIds[i]) // e.g. "Node3:8080" (what you stored)
if !ok || addr == "" {
return nil
}
addrs[i] = NormalizeHostPort(addr, "8080")

}
return addrs
}
Loading