Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
sudo apt-get install -y upx

- name: Run GoReleaser
uses: goreleaser/goreleaser-action@ec59f474b9834571250b370d4735c50f8e2d1e29 # v6
uses: goreleaser/goreleaser-action@e24998b8b67b290c2fa8b7c14fcfa7de2c5c9b8c # v7.1.0
with:
distribution: goreleaser
version: latest
Expand Down
4 changes: 4 additions & 0 deletions internal/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ listen: %q
# Optional: enable the HTTP/REST gateway on a separate port.
# When set, REST clients can call all gRPC methods via JSON over HTTP.
# Requires the x-ledger-store and x-api-key HTTP headers (same as gRPC).
# Note: the gateway serves plain HTTP even when gRPC is configured with TLS.
# Use a reverse proxy (nginx, Caddy) in front if HTTPS termination is needed.
# Note: bytes fields (e.g. payload) are base64-encoded in JSON responses.
# Note: changing this value requires a full restart (SIGHUP only reloads hooks).
# http_listen: "localhost:8080"

# Optional: write daemon logs to a file instead of stderr.
Expand Down
32 changes: 28 additions & 4 deletions internal/server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"os"
"strings"
Expand All @@ -23,7 +24,10 @@ import (
//
// The x-ledger-store and x-api-key HTTP headers are forwarded as gRPC metadata
// so the existing server-side auth interceptors handle authentication.
func newGatewayHandler(ctx context.Context, grpcAddr string, cfg *config.Config) (http.Handler, error) {
//
// Note: bytes fields (e.g. Entry.payload) are base64-encoded in JSON responses
// per the protobuf JSON mapping specification.
func newGatewayHandler(grpcAddr string, cfg *config.Config) (http.Handler, error) {
dialOpts, err := gatewayDialOpts(cfg)
if err != nil {
return nil, fmt.Errorf("gateway: dial options: %w", err)
Expand All @@ -32,7 +36,11 @@ func newGatewayHandler(ctx context.Context, grpcAddr string, cfg *config.Config)
mux := runtime.NewServeMux(
runtime.WithIncomingHeaderMatcher(gatewayHeaderMatcher),
)
if err := ledgerv1.RegisterLedgerServiceHandlerFromEndpoint(ctx, mux, grpcAddr, dialOpts); err != nil {
// Use context.Background so the loopback connection lives for the server's
// lifetime and is not affected by short-lived caller contexts.
if err := ledgerv1.RegisterLedgerServiceHandlerFromEndpoint(
context.Background(), mux, grpcAddr, dialOpts,
); err != nil {
return nil, fmt.Errorf("gateway: register: %w", err)
}
return mux, nil
Expand All @@ -42,12 +50,28 @@ func newGatewayHandler(ctx context.Context, grpcAddr string, cfg *config.Config)
// metadata in addition to the default grpc-gateway headers.
func gatewayHeaderMatcher(key string) (string, bool) {
switch strings.ToLower(key) {
case ledgerpb.StoreMetadataHeader, ledgerpb.APIKeyMetadataHeader:
return key, true
case ledgerpb.StoreMetadataHeader:
return ledgerpb.StoreMetadataHeader, true
case ledgerpb.APIKeyMetadataHeader:
return ledgerpb.APIKeyMetadataHeader, true
}
return runtime.DefaultHeaderMatcher(key)
}

// gatewayDialTarget returns the gRPC dial target for the loopback gateway
// connection. When the bound address uses a wildcard host (0.0.0.0 or ::),
// it is rewritten to 127.0.0.1 so traffic stays on the loopback interface.
func gatewayDialTarget(boundAddr string) string {
host, port, err := net.SplitHostPort(boundAddr)
if err != nil {
return boundAddr
}
if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
host = "127.0.0.1"
}
return net.JoinHostPort(host, port)
}

// gatewayDialOpts builds the gRPC dial options for the gateway→gRPC loopback.
// When TLS is configured on the gRPC server the gateway uses matching credentials;
// otherwise it dials plaintext.
Expand Down
55 changes: 55 additions & 0 deletions internal/server/gateway_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package server

import "testing"

func TestGatewayDialTarget(t *testing.T) {
tests := []struct {
name string
input string
want string
}{
{"ipv4 wildcard", "0.0.0.0:50051", "127.0.0.1:50051"},
{"ipv6 wildcard", "[::]:50051", "127.0.0.1:50051"},
{"specific ipv4", "127.0.0.1:50051", "127.0.0.1:50051"},
{"named host", "localhost:50051", "localhost:50051"},
{"non-loopback ip", "10.0.0.1:50051", "10.0.0.1:50051"},
{"malformed passthrough", "not-a-hostport", "not-a-hostport"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := gatewayDialTarget(tt.input)
if got != tt.want {
t.Errorf("gatewayDialTarget(%q) = %q, want %q", tt.input, got, tt.want)
}
})
}
}

func TestGatewayHeaderMatcher(t *testing.T) {
tests := []struct {
input string
wantKey string
wantPass bool
}{
// x-ledger-store must pass through regardless of case.
{"x-ledger-store", "x-ledger-store", true},
{"X-Ledger-Store", "x-ledger-store", true},
// x-api-key must pass through regardless of case.
{"x-api-key", "x-api-key", true},
{"X-Api-Key", "x-api-key", true},
// Unrelated headers delegate to the default matcher (strips prefix, preserves case).
{"Grpc-Metadata-Custom", "Custom", true},
{"X-Random-Header", "", false},
}
for _, tt := range tests {
t.Run(tt.input, func(t *testing.T) {
key, pass := gatewayHeaderMatcher(tt.input)
if pass != tt.wantPass {
t.Errorf("gatewayHeaderMatcher(%q) pass = %v, want %v", tt.input, pass, tt.wantPass)
}
if pass && key != tt.wantKey {
t.Errorf("gatewayHeaderMatcher(%q) key = %q, want %q", tt.input, key, tt.wantKey)
}
})
}
}
122 changes: 119 additions & 3 deletions internal/server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,27 @@ package server_test

import (
"bytes"
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"encoding/pem"
"fmt"
"io"
"math/big"
"net"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/rbaliyan/ledger/internal/config"
"github.com/rbaliyan/ledger/internal/server"
"github.com/rbaliyan/ledger/ledgerpb"
"context"

_ "modernc.org/sqlite"
)
Expand Down Expand Up @@ -39,7 +50,9 @@ func newGatewayServer(t *testing.T) (grpcAddr, httpAddr string) {
func TestGatewayHealth(t *testing.T) {
_, httpAddr := newGatewayServer(t)

resp, err := http.Get(fmt.Sprintf("http://%s/v1/health", httpAddr)) //nolint:noctx
req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet,
fmt.Sprintf("http://%s/v1/health", httpAddr), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("GET /v1/health: %v", err)
}
Expand Down Expand Up @@ -134,7 +147,7 @@ func TestGatewayAPIKeyAuth(t *testing.T) {

base := fmt.Sprintf("http://%s", srv.HTTPAddr())

// No API key — should be rejected.
// No API key — should be rejected with 401.
req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet,
base+"/v1/streams/s/entries:count", nil)
req.Header.Set(ledgerpb.StoreMetadataHeader, "test")
Expand All @@ -161,3 +174,106 @@ func TestGatewayAPIKeyAuth(t *testing.T) {
t.Errorf("expected 200, got %d", resp2.StatusCode)
}
}

// TestGatewayErrorMapping verifies that gRPC status codes are translated to
// the correct HTTP status codes by the grpc-gateway error handler.
func TestGatewayErrorMapping(t *testing.T) {
_, httpAddr := newGatewayServer(t)
base := fmt.Sprintf("http://%s", httpAddr)

// SetTags on a non-existent entry → gRPC NotFound → HTTP 404.
body, _ := json.Marshal(map[string]any{"tags": []string{"foo"}})
req, _ := http.NewRequestWithContext(t.Context(), http.MethodPut,
base+"/v1/streams/s/entries/nonexistent/tags", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set(ledgerpb.StoreMetadataHeader, "test")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("PUT tags: %v", err)
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusNotFound {
raw, _ := io.ReadAll(resp.Body)
t.Errorf("expected 404 for non-existent entry, got %d: %s", resp.StatusCode, raw)
}
}

// TestGatewayTLS verifies that when gRPC is configured with TLS the gateway
// correctly dials back using the server certificate as the trusted CA.
func TestGatewayTLS(t *testing.T) {
certPEM, keyPEM := generateSelfSignedCert(t)
dir := t.TempDir()
certFile := filepath.Join(dir, "cert.pem")
keyFile := filepath.Join(dir, "key.pem")
if err := os.WriteFile(certFile, certPEM, 0o600); err != nil {
t.Fatalf("write cert: %v", err)
}
if err := os.WriteFile(keyFile, keyPEM, 0o600); err != nil {
t.Fatalf("write key: %v", err)
}

cfg := &config.Config{
Listen: "127.0.0.1:0",
HTTPListen: "127.0.0.1:0",
TLS: config.TLSConfig{
Cert: certFile,
Key: keyFile,
// No CA: gateway falls back to using Cert as CA (self-signed).
},
DB: config.DBConfig{
Type: "sqlite",
SQLite: config.SQLiteConfig{Path: ":memory:"},
},
}
srv, err := server.New(t.Context(), cfg)
if err != nil {
t.Fatalf("server.New: %v", err)
}
t.Cleanup(func() { srv.Stop(context.Background()) })
go func() { _ = srv.Serve() }()

// The HTTP gateway itself serves plain HTTP; only the gRPC loopback uses TLS.
req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet,
fmt.Sprintf("http://%s/v1/health", srv.HTTPAddr()), nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("GET health over TLS-backed gateway: %v", err)
}
defer resp.Body.Close() //nolint:errcheck
if resp.StatusCode != http.StatusOK {
raw, _ := io.ReadAll(resp.Body)
t.Fatalf("expected 200, got %d: %s", resp.StatusCode, raw)
}
}

// generateSelfSignedCert returns PEM-encoded certificate and private key for
// a certificate valid for localhost / 127.0.0.1.
func generateSelfSignedCert(t *testing.T) (certPEM, keyPEM []byte) {
t.Helper()
priv, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
t.Fatalf("generate key: %v", err)
}
template := &x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "localhost"},
DNSNames: []string{"localhost"},
IPAddresses: []net.IP{net.ParseIP("127.0.0.1")},
NotBefore: time.Now().Add(-time.Hour),
NotAfter: time.Now().Add(time.Hour),
KeyUsage: x509.KeyUsageDigitalSignature,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}
certDER, err := x509.CreateCertificate(rand.Reader, template, template, &priv.PublicKey, priv)
if err != nil {
t.Fatalf("create cert: %v", err)
}
certPEM = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
privDER, err := x509.MarshalECPrivateKey(priv)
if err != nil {
t.Fatalf("marshal key: %v", err)
}
keyPEM = pem.EncodeToMemory(&pem.Block{Type: "EC PRIVATE KEY", Bytes: privDER})
return certPEM, keyPEM
}
44 changes: 38 additions & 6 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"crypto/tls"
"crypto/x509"
"database/sql"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"os"
"sync"
"time"

ledgerv1 "github.com/rbaliyan/ledger/api/ledger/v1"
"github.com/rbaliyan/ledger/internal/config"
Expand Down Expand Up @@ -77,7 +79,7 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
}

if cfg.HTTPListen != "" {
httpHandler, err := newGatewayHandler(ctx, ln.Addr().String(), cfg)
httpHandler, err := newGatewayHandler(gatewayDialTarget(ln.Addr().String()), cfg)
if err != nil {
hookCancel()
_ = ln.Close()
Expand All @@ -91,7 +93,11 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
mux.Close(ctx)
return nil, fmt.Errorf("server: http listen %s: %w", cfg.HTTPListen, err)
}
srv.httpServer = &http.Server{Handler: httpHandler}
srv.httpServer = &http.Server{
Handler: httpHandler,
ReadHeaderTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
}
srv.httpListener = httpLn
slog.Info("ledger HTTP gateway listening", "addr", httpLn.Addr().String())
}
Expand All @@ -101,6 +107,9 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) {
if err != nil {
hookCancel()
_ = ln.Close()
if srv.httpListener != nil {
_ = srv.httpListener.Close()
}
mux.Close(ctx)
return nil, fmt.Errorf("server: hook: %w", err)
}
Expand Down Expand Up @@ -161,16 +170,39 @@ func (s *Server) ReloadHooks(cfg *config.Config) {
slog.Info("hooks reloaded", "count", len(hooks))
}

// Serve starts the HTTP gateway (if configured) and blocks on the gRPC server.
// Serve starts both the gRPC server and the HTTP gateway (if configured),
// and returns the first error from either. Both servers are run in goroutines;
// callers must call Stop to shut them down.
func (s *Server) Serve() error {
errCh := make(chan error, 2)
n := 1
if s.httpServer != nil {
n = 2
go func() {
err := s.httpServer.Serve(s.httpListener)
if errors.Is(err, http.ErrServerClosed) {
err = nil
}
errCh <- err
}()
}
go func() {
err := s.grpc.Serve(s.listener)
if errors.Is(err, grpc.ErrServerStopped) {
err = nil
}
errCh <- err
}()
first := <-errCh
// Drain the second goroutine's result so it never blocks; log unexpected errors.
if n == 2 {
go func() {
if err := s.httpServer.Serve(s.httpListener); err != nil && err != http.ErrServerClosed {
slog.Warn("HTTP gateway stopped", "err", err)
if err := <-errCh; err != nil {
slog.Warn("server stopped with error", "err", err)
}
}()
}
return s.grpc.Serve(s.listener)
return first
}

// Stop gracefully drains in-flight RPCs, stops hook runners, then closes
Expand Down
Loading