diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 1b98730..f440f9f 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -38,7 +38,7 @@ jobs: sudo apt-get install -y upx - name: Run GoReleaser - uses: goreleaser/goreleaser-action@ec59f474b9834571250b370d4735c50f8e2d1e29 # v6 + uses: goreleaser/goreleaser-action@e24998b8b67b290c2fa8b7c14fcfa7de2c5c9b8c # v7.1.0 with: distribution: goreleaser version: latest diff --git a/internal/cli/start.go b/internal/cli/start.go index 0c3f5fa..1085e8f 100644 --- a/internal/cli/start.go +++ b/internal/cli/start.go @@ -305,6 +305,10 @@ listen: %q # Optional: enable the HTTP/REST gateway on a separate port. # When set, REST clients can call all gRPC methods via JSON over HTTP. # Requires the x-ledger-store and x-api-key HTTP headers (same as gRPC). +# Note: the gateway serves plain HTTP even when gRPC is configured with TLS. +# Use a reverse proxy (nginx, Caddy) in front if HTTPS termination is needed. +# Note: bytes fields (e.g. payload) are base64-encoded in JSON responses. +# Note: changing this value requires a full restart (SIGHUP only reloads hooks). # http_listen: "localhost:8080" # Optional: write daemon logs to a file instead of stderr. diff --git a/internal/server/gateway.go b/internal/server/gateway.go index 66277e8..07baebb 100644 --- a/internal/server/gateway.go +++ b/internal/server/gateway.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "net" "net/http" "os" "strings" @@ -23,7 +24,10 @@ import ( // // The x-ledger-store and x-api-key HTTP headers are forwarded as gRPC metadata // so the existing server-side auth interceptors handle authentication. -func newGatewayHandler(ctx context.Context, grpcAddr string, cfg *config.Config) (http.Handler, error) { +// +// Note: bytes fields (e.g. Entry.payload) are base64-encoded in JSON responses +// per the protobuf JSON mapping specification. +func newGatewayHandler(grpcAddr string, cfg *config.Config) (http.Handler, error) { dialOpts, err := gatewayDialOpts(cfg) if err != nil { return nil, fmt.Errorf("gateway: dial options: %w", err) @@ -32,7 +36,11 @@ func newGatewayHandler(ctx context.Context, grpcAddr string, cfg *config.Config) mux := runtime.NewServeMux( runtime.WithIncomingHeaderMatcher(gatewayHeaderMatcher), ) - if err := ledgerv1.RegisterLedgerServiceHandlerFromEndpoint(ctx, mux, grpcAddr, dialOpts); err != nil { + // Use context.Background so the loopback connection lives for the server's + // lifetime and is not affected by short-lived caller contexts. + if err := ledgerv1.RegisterLedgerServiceHandlerFromEndpoint( + context.Background(), mux, grpcAddr, dialOpts, + ); err != nil { return nil, fmt.Errorf("gateway: register: %w", err) } return mux, nil @@ -42,12 +50,28 @@ func newGatewayHandler(ctx context.Context, grpcAddr string, cfg *config.Config) // metadata in addition to the default grpc-gateway headers. func gatewayHeaderMatcher(key string) (string, bool) { switch strings.ToLower(key) { - case ledgerpb.StoreMetadataHeader, ledgerpb.APIKeyMetadataHeader: - return key, true + case ledgerpb.StoreMetadataHeader: + return ledgerpb.StoreMetadataHeader, true + case ledgerpb.APIKeyMetadataHeader: + return ledgerpb.APIKeyMetadataHeader, true } return runtime.DefaultHeaderMatcher(key) } +// gatewayDialTarget returns the gRPC dial target for the loopback gateway +// connection. When the bound address uses a wildcard host (0.0.0.0 or ::), +// it is rewritten to 127.0.0.1 so traffic stays on the loopback interface. +func gatewayDialTarget(boundAddr string) string { + host, port, err := net.SplitHostPort(boundAddr) + if err != nil { + return boundAddr + } + if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() { + host = "127.0.0.1" + } + return net.JoinHostPort(host, port) +} + // gatewayDialOpts builds the gRPC dial options for the gateway→gRPC loopback. // When TLS is configured on the gRPC server the gateway uses matching credentials; // otherwise it dials plaintext. diff --git a/internal/server/gateway_internal_test.go b/internal/server/gateway_internal_test.go new file mode 100644 index 0000000..8216edd --- /dev/null +++ b/internal/server/gateway_internal_test.go @@ -0,0 +1,55 @@ +package server + +import "testing" + +func TestGatewayDialTarget(t *testing.T) { + tests := []struct { + name string + input string + want string + }{ + {"ipv4 wildcard", "0.0.0.0:50051", "127.0.0.1:50051"}, + {"ipv6 wildcard", "[::]:50051", "127.0.0.1:50051"}, + {"specific ipv4", "127.0.0.1:50051", "127.0.0.1:50051"}, + {"named host", "localhost:50051", "localhost:50051"}, + {"non-loopback ip", "10.0.0.1:50051", "10.0.0.1:50051"}, + {"malformed passthrough", "not-a-hostport", "not-a-hostport"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := gatewayDialTarget(tt.input) + if got != tt.want { + t.Errorf("gatewayDialTarget(%q) = %q, want %q", tt.input, got, tt.want) + } + }) + } +} + +func TestGatewayHeaderMatcher(t *testing.T) { + tests := []struct { + input string + wantKey string + wantPass bool + }{ + // x-ledger-store must pass through regardless of case. + {"x-ledger-store", "x-ledger-store", true}, + {"X-Ledger-Store", "x-ledger-store", true}, + // x-api-key must pass through regardless of case. + {"x-api-key", "x-api-key", true}, + {"X-Api-Key", "x-api-key", true}, + // Unrelated headers delegate to the default matcher (strips prefix, preserves case). + {"Grpc-Metadata-Custom", "Custom", true}, + {"X-Random-Header", "", false}, + } + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + key, pass := gatewayHeaderMatcher(tt.input) + if pass != tt.wantPass { + t.Errorf("gatewayHeaderMatcher(%q) pass = %v, want %v", tt.input, pass, tt.wantPass) + } + if pass && key != tt.wantKey { + t.Errorf("gatewayHeaderMatcher(%q) key = %q, want %q", tt.input, key, tt.wantKey) + } + }) + } +} diff --git a/internal/server/gateway_test.go b/internal/server/gateway_test.go index 6aa29d8..ab47471 100644 --- a/internal/server/gateway_test.go +++ b/internal/server/gateway_test.go @@ -2,16 +2,27 @@ package server_test import ( "bytes" + "context" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/x509" + "crypto/x509/pkix" "encoding/json" + "encoding/pem" "fmt" "io" + "math/big" + "net" "net/http" + "os" + "path/filepath" "testing" + "time" "github.com/rbaliyan/ledger/internal/config" "github.com/rbaliyan/ledger/internal/server" "github.com/rbaliyan/ledger/ledgerpb" - "context" _ "modernc.org/sqlite" ) @@ -39,7 +50,9 @@ func newGatewayServer(t *testing.T) (grpcAddr, httpAddr string) { func TestGatewayHealth(t *testing.T) { _, httpAddr := newGatewayServer(t) - resp, err := http.Get(fmt.Sprintf("http://%s/v1/health", httpAddr)) //nolint:noctx + req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, + fmt.Sprintf("http://%s/v1/health", httpAddr), nil) + resp, err := http.DefaultClient.Do(req) if err != nil { t.Fatalf("GET /v1/health: %v", err) } @@ -134,7 +147,7 @@ func TestGatewayAPIKeyAuth(t *testing.T) { base := fmt.Sprintf("http://%s", srv.HTTPAddr()) - // No API key — should be rejected. + // No API key — should be rejected with 401. req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, base+"/v1/streams/s/entries:count", nil) req.Header.Set(ledgerpb.StoreMetadataHeader, "test") @@ -161,3 +174,106 @@ func TestGatewayAPIKeyAuth(t *testing.T) { t.Errorf("expected 200, got %d", resp2.StatusCode) } } + +// TestGatewayErrorMapping verifies that gRPC status codes are translated to +// the correct HTTP status codes by the grpc-gateway error handler. +func TestGatewayErrorMapping(t *testing.T) { + _, httpAddr := newGatewayServer(t) + base := fmt.Sprintf("http://%s", httpAddr) + + // SetTags on a non-existent entry → gRPC NotFound → HTTP 404. + body, _ := json.Marshal(map[string]any{"tags": []string{"foo"}}) + req, _ := http.NewRequestWithContext(t.Context(), http.MethodPut, + base+"/v1/streams/s/entries/nonexistent/tags", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(ledgerpb.StoreMetadataHeader, "test") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("PUT tags: %v", err) + } + defer resp.Body.Close() //nolint:errcheck + if resp.StatusCode != http.StatusNotFound { + raw, _ := io.ReadAll(resp.Body) + t.Errorf("expected 404 for non-existent entry, got %d: %s", resp.StatusCode, raw) + } +} + +// TestGatewayTLS verifies that when gRPC is configured with TLS the gateway +// correctly dials back using the server certificate as the trusted CA. +func TestGatewayTLS(t *testing.T) { + certPEM, keyPEM := generateSelfSignedCert(t) + dir := t.TempDir() + certFile := filepath.Join(dir, "cert.pem") + keyFile := filepath.Join(dir, "key.pem") + if err := os.WriteFile(certFile, certPEM, 0o600); err != nil { + t.Fatalf("write cert: %v", err) + } + if err := os.WriteFile(keyFile, keyPEM, 0o600); err != nil { + t.Fatalf("write key: %v", err) + } + + cfg := &config.Config{ + Listen: "127.0.0.1:0", + HTTPListen: "127.0.0.1:0", + TLS: config.TLSConfig{ + Cert: certFile, + Key: keyFile, + // No CA: gateway falls back to using Cert as CA (self-signed). + }, + DB: config.DBConfig{ + Type: "sqlite", + SQLite: config.SQLiteConfig{Path: ":memory:"}, + }, + } + srv, err := server.New(t.Context(), cfg) + if err != nil { + t.Fatalf("server.New: %v", err) + } + t.Cleanup(func() { srv.Stop(context.Background()) }) + go func() { _ = srv.Serve() }() + + // The HTTP gateway itself serves plain HTTP; only the gRPC loopback uses TLS. + req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, + fmt.Sprintf("http://%s/v1/health", srv.HTTPAddr()), nil) + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("GET health over TLS-backed gateway: %v", err) + } + defer resp.Body.Close() //nolint:errcheck + if resp.StatusCode != http.StatusOK { + raw, _ := io.ReadAll(resp.Body) + t.Fatalf("expected 200, got %d: %s", resp.StatusCode, raw) + } +} + +// generateSelfSignedCert returns PEM-encoded certificate and private key for +// a certificate valid for localhost / 127.0.0.1. +func generateSelfSignedCert(t *testing.T) (certPEM, keyPEM []byte) { + t.Helper() + priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("generate key: %v", err) + } + template := &x509.Certificate{ + SerialNumber: big.NewInt(1), + Subject: pkix.Name{CommonName: "localhost"}, + DNSNames: []string{"localhost"}, + IPAddresses: []net.IP{net.ParseIP("127.0.0.1")}, + NotBefore: time.Now().Add(-time.Hour), + NotAfter: time.Now().Add(time.Hour), + KeyUsage: x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + } + certDER, err := x509.CreateCertificate(rand.Reader, template, template, &priv.PublicKey, priv) + if err != nil { + t.Fatalf("create cert: %v", err) + } + certPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) + privDER, err := x509.MarshalECPrivateKey(priv) + if err != nil { + t.Fatalf("marshal key: %v", err) + } + keyPEM = pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: privDER}) + return certPEM, keyPEM +} diff --git a/internal/server/server.go b/internal/server/server.go index 2b417b6..ecd27f7 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "database/sql" + "errors" "fmt" "io" "log/slog" @@ -12,6 +13,7 @@ import ( "net/http" "os" "sync" + "time" ledgerv1 "github.com/rbaliyan/ledger/api/ledger/v1" "github.com/rbaliyan/ledger/internal/config" @@ -77,7 +79,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { } if cfg.HTTPListen != "" { - httpHandler, err := newGatewayHandler(ctx, ln.Addr().String(), cfg) + httpHandler, err := newGatewayHandler(gatewayDialTarget(ln.Addr().String()), cfg) if err != nil { hookCancel() _ = ln.Close() @@ -91,7 +93,11 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { mux.Close(ctx) return nil, fmt.Errorf("server: http listen %s: %w", cfg.HTTPListen, err) } - srv.httpServer = &http.Server{Handler: httpHandler} + srv.httpServer = &http.Server{ + Handler: httpHandler, + ReadHeaderTimeout: 10 * time.Second, + IdleTimeout: 60 * time.Second, + } srv.httpListener = httpLn slog.Info("ledger HTTP gateway listening", "addr", httpLn.Addr().String()) } @@ -101,6 +107,9 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { if err != nil { hookCancel() _ = ln.Close() + if srv.httpListener != nil { + _ = srv.httpListener.Close() + } mux.Close(ctx) return nil, fmt.Errorf("server: hook: %w", err) } @@ -161,16 +170,39 @@ func (s *Server) ReloadHooks(cfg *config.Config) { slog.Info("hooks reloaded", "count", len(hooks)) } -// Serve starts the HTTP gateway (if configured) and blocks on the gRPC server. +// Serve starts both the gRPC server and the HTTP gateway (if configured), +// and returns the first error from either. Both servers are run in goroutines; +// callers must call Stop to shut them down. func (s *Server) Serve() error { + errCh := make(chan error, 2) + n := 1 if s.httpServer != nil { + n = 2 + go func() { + err := s.httpServer.Serve(s.httpListener) + if errors.Is(err, http.ErrServerClosed) { + err = nil + } + errCh <- err + }() + } + go func() { + err := s.grpc.Serve(s.listener) + if errors.Is(err, grpc.ErrServerStopped) { + err = nil + } + errCh <- err + }() + first := <-errCh + // Drain the second goroutine's result so it never blocks; log unexpected errors. + if n == 2 { go func() { - if err := s.httpServer.Serve(s.httpListener); err != nil && err != http.ErrServerClosed { - slog.Warn("HTTP gateway stopped", "err", err) + if err := <-errCh; err != nil { + slog.Warn("server stopped with error", "err", err) } }() } - return s.grpc.Serve(s.listener) + return first } // Stop gracefully drains in-flight RPCs, stops hook runners, then closes