diff --git a/api/ledger/v1/ledger.pb.go b/api/ledger/v1/ledger.pb.go index 4f20c9c..647d027 100644 --- a/api/ledger/v1/ledger.pb.go +++ b/api/ledger/v1/ledger.pb.go @@ -7,6 +7,7 @@ package ledgerv1 import ( + _ "google.golang.org/genproto/googleapis/api/annotations" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -120,8 +121,9 @@ func (x *EntryInput) GetTags() []string { // Entry is a stored ledger entry returned by Read. type Entry struct { state protoimpl.MessageState `protogen:"open.v1"` - // id is the store-assigned unique identifier (decimal int64 for SQL backends, - // hex ObjectID string for MongoDB). + // id is the store-assigned unique identifier. The format depends on the + // backend: decimal int64 string for SQLite/PostgreSQL, MongoDB ObjectID + // hex string, or a 32-char time-ordered hex string for ClickHouse. Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // stream is the stream this entry belongs to. Stream string `protobuf:"bytes,2,opt,name=stream,proto3" json:"stream,omitempty"` @@ -1432,7 +1434,7 @@ var File_ledger_v1_ledger_proto protoreflect.FileDescriptor const file_ledger_v1_ledger_proto_rawDesc = "" + "\n" + - "\x16ledger/v1/ledger.proto\x12\tledger.v1\x1a\x1fgoogle/protobuf/timestamp.proto\"\x99\x02\n" + + "\x16ledger/v1/ledger.proto\x12\tledger.v1\x1a\x1cgoogle/api/annotations.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\x99\x02\n" + "\n" + "EntryInput\x12\x18\n" + "\apayload\x18\x01 \x01(\fR\apayload\x12\x1b\n" + @@ -1530,19 +1532,20 @@ const file_ledger_v1_ledger_proto_rawDesc = "" + "\aentries\x18\x01 \x03(\v2\x10.ledger.v1.EntryR\aentries\"\x0f\n" + "\rHealthRequest\"(\n" + "\x0eHealthResponse\x12\x16\n" + - "\x06status\x18\x01 \x01(\tR\x06status2\xf1\x05\n" + - "\rLedgerService\x12=\n" + - "\x06Append\x12\x18.ledger.v1.AppendRequest\x1a\x19.ledger.v1.AppendResponse\x127\n" + - "\x04Read\x12\x16.ledger.v1.ReadRequest\x1a\x17.ledger.v1.ReadResponse\x12:\n" + - "\x05Count\x12\x17.ledger.v1.CountRequest\x1a\x18.ledger.v1.CountResponse\x12@\n" + - "\aSetTags\x12\x19.ledger.v1.SetTagsRequest\x1a\x1a.ledger.v1.SetTagsResponse\x12U\n" + - "\x0eSetAnnotations\x12 .ledger.v1.SetAnnotationsRequest\x1a!.ledger.v1.SetAnnotationsResponse\x127\n" + - "\x04Trim\x12\x16.ledger.v1.TrimRequest\x1a\x17.ledger.v1.TrimResponse\x12R\n" + - "\rListStreamIDs\x12\x1f.ledger.v1.ListStreamIDsRequest\x1a .ledger.v1.ListStreamIDsResponse\x12O\n" + - "\fRenameStream\x12\x1e.ledger.v1.RenameStreamRequest\x1a\x1f.ledger.v1.RenameStreamResponse\x127\n" + - "\x04Stat\x12\x16.ledger.v1.StatRequest\x1a\x17.ledger.v1.StatResponse\x12=\n" + - "\x06Search\x12\x18.ledger.v1.SearchRequest\x1a\x19.ledger.v1.SearchResponse\x12=\n" + - "\x06Health\x12\x18.ledger.v1.HealthRequest\x1a\x19.ledger.v1.HealthResponseB3Z1github.com/rbaliyan/ledger/api/ledger/v1;ledgerv1b\x06proto3" + "\x06status\x18\x01 \x01(\tR\x06status2\xaa\t\n" + + "\rLedgerService\x12f\n" + + "\x06Append\x12\x18.ledger.v1.AppendRequest\x1a\x19.ledger.v1.AppendResponse\"'\x82\xd3\xe4\x93\x02!:\x01*\"\x1c/v1/streams/{stream}/entries\x12]\n" + + "\x04Read\x12\x16.ledger.v1.ReadRequest\x1a\x17.ledger.v1.ReadResponse\"$\x82\xd3\xe4\x93\x02\x1e\x12\x1c/v1/streams/{stream}/entries\x12f\n" + + "\x05Count\x12\x17.ledger.v1.CountRequest\x1a\x18.ledger.v1.CountResponse\"*\x82\xd3\xe4\x93\x02$\x12\"/v1/streams/{stream}/entries:count\x12s\n" + + "\aSetTags\x12\x19.ledger.v1.SetTagsRequest\x1a\x1a.ledger.v1.SetTagsResponse\"1\x82\xd3\xe4\x93\x02+:\x01*\x1a&/v1/streams/{stream}/entries/{id}/tags\x12\x8f\x01\n" + + "\x0eSetAnnotations\x12 .ledger.v1.SetAnnotationsRequest\x1a!.ledger.v1.SetAnnotationsResponse\"8\x82\xd3\xe4\x93\x022:\x01*2-/v1/streams/{stream}/entries/{id}/annotations\x12e\n" + + "\x04Trim\x12\x16.ledger.v1.TrimRequest\x1a\x17.ledger.v1.TrimResponse\",\x82\xd3\xe4\x93\x02&:\x01*\"!/v1/streams/{stream}/entries:trim\x12g\n" + + "\rListStreamIDs\x12\x1f.ledger.v1.ListStreamIDsRequest\x1a .ledger.v1.ListStreamIDsResponse\"\x13\x82\xd3\xe4\x93\x02\r\x12\v/v1/streams\x12u\n" + + "\fRenameStream\x12\x1e.ledger.v1.RenameStreamRequest\x1a\x1f.ledger.v1.RenameStreamResponse\"$\x82\xd3\xe4\x93\x02\x1e:\x01*\"\x19/v1/streams/{name}:rename\x12Z\n" + + "\x04Stat\x12\x16.ledger.v1.StatRequest\x1a\x17.ledger.v1.StatResponse\"!\x82\xd3\xe4\x93\x02\x1b\x12\x19/v1/streams/{stream}:stat\x12m\n" + + "\x06Search\x12\x18.ledger.v1.SearchRequest\x1a\x19.ledger.v1.SearchResponse\".\x82\xd3\xe4\x93\x02(:\x01*\"#/v1/streams/{stream}/entries:search\x12Q\n" + + "\x06Health\x12\x18.ledger.v1.HealthRequest\x1a\x19.ledger.v1.HealthResponse\"\x12\x82\xd3\xe4\x93\x02\f\x12\n" + + "/v1/healthB3Z1github.com/rbaliyan/ledger/api/ledger/v1;ledgerv1b\x06proto3" var ( file_ledger_v1_ledger_proto_rawDescOnce sync.Once diff --git a/api/ledger/v1/ledger.pb.gw.go b/api/ledger/v1/ledger.pb.gw.go new file mode 100644 index 0000000..dd86a62 --- /dev/null +++ b/api/ledger/v1/ledger.pb.gw.go @@ -0,0 +1,1009 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: ledger/v1/ledger.proto + +/* +Package ledgerv1 is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package ledgerv1 + +import ( + "context" + "errors" + "io" + "net/http" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + "github.com/grpc-ecosystem/grpc-gateway/v2/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" +) + +// Suppress "imported and not used" errors +var ( + _ codes.Code + _ io.Reader + _ status.Status + _ = errors.New + _ = runtime.String + _ = utilities.NewDoubleArray + _ = metadata.Join +) + +func request_LedgerService_Append_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq AppendRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := client.Append(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_Append_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq AppendRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := server.Append(ctx, &protoReq) + return msg, metadata, err +} + +var filter_LedgerService_Read_0 = &utilities.DoubleArray{Encoding: map[string]int{"stream": 0}, Base: []int{1, 1, 0}, Check: []int{0, 1, 2}} + +func request_LedgerService_Read_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq ReadRequest + metadata runtime.ServerMetadata + err error + ) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + if err := req.ParseForm(); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_LedgerService_Read_0); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + msg, err := client.Read(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_Read_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq ReadRequest + metadata runtime.ServerMetadata + err error + ) + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + if err := req.ParseForm(); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_LedgerService_Read_0); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + msg, err := server.Read(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_Count_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq CountRequest + metadata runtime.ServerMetadata + err error + ) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := client.Count(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_Count_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq CountRequest + metadata runtime.ServerMetadata + err error + ) + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := server.Count(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_SetTags_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq SetTagsRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + val, ok = pathParams["id"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id") + } + protoReq.Id, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err) + } + msg, err := client.SetTags(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_SetTags_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq SetTagsRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + val, ok = pathParams["id"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id") + } + protoReq.Id, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err) + } + msg, err := server.SetTags(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_SetAnnotations_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq SetAnnotationsRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + val, ok = pathParams["id"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id") + } + protoReq.Id, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err) + } + msg, err := client.SetAnnotations(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_SetAnnotations_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq SetAnnotationsRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + val, ok = pathParams["id"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "id") + } + protoReq.Id, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "id", err) + } + msg, err := server.SetAnnotations(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_Trim_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq TrimRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := client.Trim(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_Trim_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq TrimRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := server.Trim(ctx, &protoReq) + return msg, metadata, err +} + +var filter_LedgerService_ListStreamIDs_0 = &utilities.DoubleArray{Encoding: map[string]int{}, Base: []int(nil), Check: []int(nil)} + +func request_LedgerService_ListStreamIDs_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq ListStreamIDsRequest + metadata runtime.ServerMetadata + ) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + if err := req.ParseForm(); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_LedgerService_ListStreamIDs_0); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + msg, err := client.ListStreamIDs(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_ListStreamIDs_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq ListStreamIDsRequest + metadata runtime.ServerMetadata + ) + if err := req.ParseForm(); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if err := runtime.PopulateQueryParameters(&protoReq, req.Form, filter_LedgerService_ListStreamIDs_0); err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + msg, err := server.ListStreamIDs(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_RenameStream_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq RenameStreamRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["name"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name") + } + protoReq.Name, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err) + } + msg, err := client.RenameStream(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_RenameStream_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq RenameStreamRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + val, ok := pathParams["name"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "name") + } + protoReq.Name, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "name", err) + } + msg, err := server.RenameStream(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_Stat_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq StatRequest + metadata runtime.ServerMetadata + err error + ) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := client.Stat(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_Stat_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq StatRequest + metadata runtime.ServerMetadata + err error + ) + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := server.Stat(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_Search_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq SearchRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := client.Search(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_Search_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq SearchRequest + metadata runtime.ServerMetadata + err error + ) + if err := marshaler.NewDecoder(req.Body).Decode(&protoReq); err != nil && !errors.Is(err, io.EOF) { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + val, ok := pathParams["stream"] + if !ok { + return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "stream") + } + protoReq.Stream, err = runtime.String(val) + if err != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "stream", err) + } + msg, err := server.Search(ctx, &protoReq) + return msg, metadata, err +} + +func request_LedgerService_Health_0(ctx context.Context, marshaler runtime.Marshaler, client LedgerServiceClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq HealthRequest + metadata runtime.ServerMetadata + ) + if req.Body != nil { + _, _ = io.Copy(io.Discard, req.Body) + } + msg, err := client.Health(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err +} + +func local_request_LedgerService_Health_0(ctx context.Context, marshaler runtime.Marshaler, server LedgerServiceServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var ( + protoReq HealthRequest + metadata runtime.ServerMetadata + ) + msg, err := server.Health(ctx, &protoReq) + return msg, metadata, err +} + +// RegisterLedgerServiceHandlerServer registers the http handlers for service LedgerService to "mux". +// UnaryRPC :call LedgerServiceServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterLedgerServiceHandlerFromEndpoint instead. +// GRPC interceptors will not work for this type of registration. To use interceptors, you must use the "runtime.WithMiddlewares" option in the "runtime.NewServeMux" call. +func RegisterLedgerServiceHandlerServer(ctx context.Context, mux *runtime.ServeMux, server LedgerServiceServer) error { + mux.Handle(http.MethodPost, pattern_LedgerService_Append_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/Append", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_Append_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Append_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Read_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/Read", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_Read_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Read_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Count_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/Count", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries:count")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_Count_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Count_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPut, pattern_LedgerService_SetTags_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/SetTags", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries/{id}/tags")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_SetTags_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_SetTags_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPatch, pattern_LedgerService_SetAnnotations_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/SetAnnotations", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries/{id}/annotations")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_SetAnnotations_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_SetAnnotations_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPost, pattern_LedgerService_Trim_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/Trim", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries:trim")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_Trim_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Trim_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_ListStreamIDs_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/ListStreamIDs", runtime.WithHTTPPathPattern("/v1/streams")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_ListStreamIDs_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_ListStreamIDs_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPost, pattern_LedgerService_RenameStream_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/RenameStream", runtime.WithHTTPPathPattern("/v1/streams/{name}:rename")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_RenameStream_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_RenameStream_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Stat_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/Stat", runtime.WithHTTPPathPattern("/v1/streams/{stream}:stat")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_Stat_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Stat_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPost, pattern_LedgerService_Search_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/Search", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries:search")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_Search_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Search_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Health_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateIncomingContext(ctx, mux, req, "/ledger.v1.LedgerService/Health", runtime.WithHTTPPathPattern("/v1/health")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_LedgerService_Health_0(annotatedContext, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Health_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + + return nil +} + +// RegisterLedgerServiceHandlerFromEndpoint is same as RegisterLedgerServiceHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterLedgerServiceHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.NewClient(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Errorf("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + return RegisterLedgerServiceHandler(ctx, mux, conn) +} + +// RegisterLedgerServiceHandler registers the http handlers for service LedgerService to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterLedgerServiceHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterLedgerServiceHandlerClient(ctx, mux, NewLedgerServiceClient(conn)) +} + +// RegisterLedgerServiceHandlerClient registers the http handlers for service LedgerService +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "LedgerServiceClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "LedgerServiceClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "LedgerServiceClient" to call the correct interceptors. This client ignores the HTTP middlewares. +func RegisterLedgerServiceHandlerClient(ctx context.Context, mux *runtime.ServeMux, client LedgerServiceClient) error { + mux.Handle(http.MethodPost, pattern_LedgerService_Append_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/Append", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_Append_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Append_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Read_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/Read", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_Read_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Read_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Count_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/Count", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries:count")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_Count_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Count_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPut, pattern_LedgerService_SetTags_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/SetTags", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries/{id}/tags")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_SetTags_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_SetTags_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPatch, pattern_LedgerService_SetAnnotations_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/SetAnnotations", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries/{id}/annotations")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_SetAnnotations_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_SetAnnotations_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPost, pattern_LedgerService_Trim_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/Trim", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries:trim")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_Trim_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Trim_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_ListStreamIDs_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/ListStreamIDs", runtime.WithHTTPPathPattern("/v1/streams")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_ListStreamIDs_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_ListStreamIDs_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPost, pattern_LedgerService_RenameStream_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/RenameStream", runtime.WithHTTPPathPattern("/v1/streams/{name}:rename")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_RenameStream_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_RenameStream_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Stat_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/Stat", runtime.WithHTTPPathPattern("/v1/streams/{stream}:stat")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_Stat_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Stat_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodPost, pattern_LedgerService_Search_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/Search", runtime.WithHTTPPathPattern("/v1/streams/{stream}/entries:search")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_Search_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Search_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + mux.Handle(http.MethodGet, pattern_LedgerService_Health_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + annotatedContext, err := runtime.AnnotateContext(ctx, mux, req, "/ledger.v1.LedgerService/Health", runtime.WithHTTPPathPattern("/v1/health")) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_LedgerService_Health_0(annotatedContext, inboundMarshaler, client, req, pathParams) + annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md) + if err != nil { + runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err) + return + } + forward_LedgerService_Health_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + }) + return nil +} + +var ( + pattern_LedgerService_Append_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3}, []string{"v1", "streams", "stream", "entries"}, "")) + pattern_LedgerService_Read_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3}, []string{"v1", "streams", "stream", "entries"}, "")) + pattern_LedgerService_Count_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3}, []string{"v1", "streams", "stream", "entries"}, "count")) + pattern_LedgerService_SetTags_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3, 1, 0, 4, 1, 5, 4, 2, 5}, []string{"v1", "streams", "stream", "entries", "id", "tags"}, "")) + pattern_LedgerService_SetAnnotations_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3, 1, 0, 4, 1, 5, 4, 2, 5}, []string{"v1", "streams", "stream", "entries", "id", "annotations"}, "")) + pattern_LedgerService_Trim_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3}, []string{"v1", "streams", "stream", "entries"}, "trim")) + pattern_LedgerService_ListStreamIDs_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "streams"}, "")) + pattern_LedgerService_RenameStream_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"v1", "streams", "name"}, "rename")) + pattern_LedgerService_Stat_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2}, []string{"v1", "streams", "stream"}, "stat")) + pattern_LedgerService_Search_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 1, 0, 4, 1, 5, 2, 2, 3}, []string{"v1", "streams", "stream", "entries"}, "search")) + pattern_LedgerService_Health_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1}, []string{"v1", "health"}, "")) +) + +var ( + forward_LedgerService_Append_0 = runtime.ForwardResponseMessage + forward_LedgerService_Read_0 = runtime.ForwardResponseMessage + forward_LedgerService_Count_0 = runtime.ForwardResponseMessage + forward_LedgerService_SetTags_0 = runtime.ForwardResponseMessage + forward_LedgerService_SetAnnotations_0 = runtime.ForwardResponseMessage + forward_LedgerService_Trim_0 = runtime.ForwardResponseMessage + forward_LedgerService_ListStreamIDs_0 = runtime.ForwardResponseMessage + forward_LedgerService_RenameStream_0 = runtime.ForwardResponseMessage + forward_LedgerService_Stat_0 = runtime.ForwardResponseMessage + forward_LedgerService_Search_0 = runtime.ForwardResponseMessage + forward_LedgerService_Health_0 = runtime.ForwardResponseMessage +) diff --git a/api/ledger/v1/ledger_grpc.pb.go b/api/ledger/v1/ledger_grpc.pb.go index 2c6cbfb..d0e9ae9 100644 --- a/api/ledger/v1/ledger_grpc.pb.go +++ b/api/ledger/v1/ledger_grpc.pb.go @@ -38,8 +38,10 @@ const ( // // LedgerService exposes the append-only log operations over gRPC. // -// A server instance is bound to exactly one store (table / collection). -// Within that store, streams are identified by the "stream" field in each request. +// Every request must carry the "x-ledger-store" gRPC metadata header, which +// selects the target store (table / collection). The server routes each call +// to the appropriate backend based on this header. Within a store, individual +// log instances are identified by the "stream" field in each request. type LedgerServiceClient interface { // Append adds entries to the named stream. // Entries whose DedupKey already exists in the stream are silently skipped. @@ -47,6 +49,7 @@ type LedgerServiceClient interface { Append(ctx context.Context, in *AppendRequest, opts ...grpc.CallOption) (*AppendResponse, error) // Read returns entries from the named stream, ordered by ID. // An empty stream returns an empty list (not an error). + // Query params: options.after, options.limit, options.desc, options.order_key, options.tag, options.all_tags. Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) // Count returns the total number of entries in the named stream. Count(ctx context.Context, in *CountRequest, opts ...grpc.CallOption) (*CountResponse, error) @@ -63,6 +66,7 @@ type LedgerServiceClient interface { // ListStreamIDs returns distinct stream IDs that have at least one entry. // Streams that have been fully trimmed are not returned. // Results are ascending by stream ID and cursor-paginated via after / limit. + // Query params: after, limit. ListStreamIDs(ctx context.Context, in *ListStreamIDsRequest, opts ...grpc.CallOption) (*ListStreamIDsResponse, error) // RenameStream changes the human-readable name of a stream without touching // its entries. Returns NOT_FOUND if the current name does not exist. @@ -201,8 +205,10 @@ func (c *ledgerServiceClient) Health(ctx context.Context, in *HealthRequest, opt // // LedgerService exposes the append-only log operations over gRPC. // -// A server instance is bound to exactly one store (table / collection). -// Within that store, streams are identified by the "stream" field in each request. +// Every request must carry the "x-ledger-store" gRPC metadata header, which +// selects the target store (table / collection). The server routes each call +// to the appropriate backend based on this header. Within a store, individual +// log instances are identified by the "stream" field in each request. type LedgerServiceServer interface { // Append adds entries to the named stream. // Entries whose DedupKey already exists in the stream are silently skipped. @@ -210,6 +216,7 @@ type LedgerServiceServer interface { Append(context.Context, *AppendRequest) (*AppendResponse, error) // Read returns entries from the named stream, ordered by ID. // An empty stream returns an empty list (not an error). + // Query params: options.after, options.limit, options.desc, options.order_key, options.tag, options.all_tags. Read(context.Context, *ReadRequest) (*ReadResponse, error) // Count returns the total number of entries in the named stream. Count(context.Context, *CountRequest) (*CountResponse, error) @@ -226,6 +233,7 @@ type LedgerServiceServer interface { // ListStreamIDs returns distinct stream IDs that have at least one entry. // Streams that have been fully trimmed are not returned. // Results are ascending by stream ID and cursor-paginated via after / limit. + // Query params: after, limit. ListStreamIDs(context.Context, *ListStreamIDsRequest) (*ListStreamIDsResponse, error) // RenameStream changes the human-readable name of a stream without touching // its entries. Returns NOT_FOUND if the current name does not exist. diff --git a/buf.gen.yaml b/buf.gen.yaml index 31548c6..e913ce1 100644 --- a/buf.gen.yaml +++ b/buf.gen.yaml @@ -8,3 +8,7 @@ plugins: out: api opt: - paths=source_relative + - remote: buf.build/grpc-ecosystem/gateway + out: api + opt: + - paths=source_relative diff --git a/buf.lock b/buf.lock index 06f3185..7dd360c 100644 --- a/buf.lock +++ b/buf.lock @@ -1,6 +1,9 @@ # Generated by buf. DO NOT EDIT. version: v2 deps: + - name: buf.build/googleapis/googleapis + commit: c17df5b2beca46928cc87d5656bd5343 + digest: b5:648a01e0170d4512dea7d564016165decd1ed6e34bef79fe54753e51ad7e27545709ad9157d7551270147d551155c595a2fb0bf5bb33b1c83040ddbce915c604 - name: buf.build/protocolbuffers/wellknowntypes commit: 9d16d599a978406980f6e2f081331a93 digest: b5:dd06e497a5c52f5ddf6ec02b3c7d289cc6c0432093fc2f6bf7a4fb5fae786c3e4c893e55d2759ffb6833268daf3de0bce303a406fed15725790528f2c27dc219 diff --git a/buf.yaml b/buf.yaml index c33a92b..725e50c 100644 --- a/buf.yaml +++ b/buf.yaml @@ -3,6 +3,7 @@ modules: - path: proto deps: - buf.build/protocolbuffers/wellknowntypes + - buf.build/googleapis/googleapis lint: use: - STANDARD diff --git a/go.mod b/go.mod index b5a64b0..3281f65 100644 --- a/go.mod +++ b/go.mod @@ -11,11 +11,14 @@ require ( go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 + google.golang.org/genproto/googleapis/api v0.0.0-20260414002931-afd174a4e478 google.golang.org/grpc v1.80.0 google.golang.org/protobuf v1.36.11 modernc.org/sqlite v1.48.2 ) +require github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 + require ( github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/ClickHouse/ch-go v0.71.0 // indirect @@ -45,10 +48,10 @@ require ( go.yaml.in/yaml/v3 v3.0.4 golang.org/x/crypto v0.48.0 // indirect golang.org/x/net v0.51.0 // indirect - golang.org/x/sync v0.19.0 // indirect + golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.42.0 // indirect - golang.org/x/text v0.34.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 // indirect + golang.org/x/text v0.36.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 // indirect modernc.org/libc v1.70.0 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/go.sum b/go.sum index 1ee3450..a6302c0 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 h1:5VipnvEpbqr2gA2VbM+nYVbkIF28c5ZQfqCBQ5g2xfk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0/go.mod h1:Hyl3n6Twe1hvtd9XUXDec4pTvgMSEixRuQKPTMH2bNs= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -130,8 +132,8 @@ golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVo golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= -golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= +golang.org/x/mod v0.34.0 h1:xIHgNUUnW6sYkcM5Jleh05DvLOtwc6RitGHbDk4akRI= +golang.org/x/mod v0.34.0/go.mod h1:ykgH52iCZe79kzLLMhyCUzhMci+nQj+0XkbXpNYtVjY= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -146,8 +148,8 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -166,23 +168,25 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= -golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/text v0.36.0 h1:JfKh3XmcRPqZPKevfXVpI1wXPTqbkE5f7JA92a55Yxg= +golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= -golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= +golang.org/x/tools v0.43.0 h1:12BdW9CeB3Z+J/I/wj34VMl8X+fEXBxVR90JeMX5E7s= +golang.org/x/tools v0.43.0/go.mod h1:uHkMso649BX2cZK6+RpuIPXS3ho2hZo4FVwfoy1vIk0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4= gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516 h1:sNrWoksmOyF5bvJUcnmbeAmQi8baNhqg5IWaI3llQqU= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260120221211-b8f7ae30c516/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= +google.golang.org/genproto/googleapis/api v0.0.0-20260414002931-afd174a4e478 h1:yQugLulqltosq0B/f8l4w9VryjV+N/5gcW0jQ3N8Qec= +google.golang.org/genproto/googleapis/api v0.0.0-20260414002931-afd174a4e478/go.mod h1:C6ADNqOxbgdUUeRTU+LCHDPB9ttAMCTff6auwCVa4uc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478 h1:RmoJA1ujG+/lRGNfUnOMfhCy5EipVMyvUE+KNbPbTlw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260414002931-afd174a4e478/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM= google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= diff --git a/internal/cli/client.go b/internal/cli/client.go index daa907e..87f5457 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -34,6 +34,9 @@ func clientConfig() (*config.Config, error) { if flagAddr != "" { cfg.Listen = flagAddr } + if flagHTTPAddr != "" { + cfg.HTTPListen = flagHTTPAddr + } if flagAPIKey != "" { cfg.APIKey = flagAPIKey } diff --git a/internal/cli/root.go b/internal/cli/root.go index 0c0fa3c..44d9cf6 100644 --- a/internal/cli/root.go +++ b/internal/cli/root.go @@ -5,9 +5,10 @@ import ( ) var ( - flagConfig string - flagAPIKey string - flagAddr string + flagConfig string + flagAPIKey string + flagAddr string + flagHTTPAddr string ) // Root returns the top-level cobra command. @@ -27,7 +28,8 @@ Stream subcommands let you append, read, tag, annotate, trim, and tail streams.` root.PersistentFlags().StringVarP(&flagConfig, "config", "c", "", "config file path (default: ~/.ledger/config.yaml)") root.PersistentFlags().StringVar(&flagAPIKey, "api-key", "", "API key for authentication (overrides config)") - root.PersistentFlags().StringVar(&flagAddr, "addr", "", "daemon address (overrides config listen address)") + root.PersistentFlags().StringVar(&flagAddr, "addr", "", "daemon gRPC address (overrides config listen address)") + root.PersistentFlags().StringVar(&flagHTTPAddr, "http-addr", "", "daemon HTTP gateway address (overrides config http_listen)") root.AddCommand( newVersionCmd(), diff --git a/internal/cli/start.go b/internal/cli/start.go index 0b15e35..0c3f5fa 100644 --- a/internal/cli/start.go +++ b/internal/cli/start.go @@ -213,6 +213,9 @@ func startBackground(cmd *cobra.Command, cfg *config.Config) error { if flagAddr != "" { args = append(args, "--addr", flagAddr) } + if flagHTTPAddr != "" { + args = append(args, "--http-addr", flagHTTPAddr) + } // API key is passed via environment, not argv, to avoid exposure in `ps` output. env := os.Environ() if flagAPIKey != "" { @@ -299,6 +302,11 @@ func writeDefaultConfig(cfg *config.Config) error { # Address the gRPC server listens on. listen: %q +# Optional: enable the HTTP/REST gateway on a separate port. +# When set, REST clients can call all gRPC methods via JSON over HTTP. +# Requires the x-ledger-store and x-api-key HTTP headers (same as gRPC). +# http_listen: "localhost:8080" + # Optional: write daemon logs to a file instead of stderr. # log_file: "" diff --git a/internal/config/config.go b/internal/config/config.go index dde9394..aa19693 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ import ( // Config holds the full ledger daemon configuration. type Config struct { Listen string `yaml:"listen"` + HTTPListen string `yaml:"http_listen"` // optional HTTP/REST gateway address; empty = disabled LogFile string `yaml:"log_file"` APIKey string `yaml:"api_key"` AllowedStores []string `yaml:"allowed_stores"` // optional allow-list of store names for the API key @@ -98,6 +99,11 @@ func (c *Config) Validate() error { if c.DB.Type == "" { return fmt.Errorf("ledger: config: db.type must be set (e.g. sqlite, postgres)") } + if c.HTTPListen != "" { + if _, _, err := net.SplitHostPort(c.HTTPListen); err != nil { + return fmt.Errorf("ledger: config: invalid http_listen address %q: %w", c.HTTPListen, err) + } + } if c.TLS.Cert != "" && c.TLS.Key == "" { return fmt.Errorf("ledger: config: tls.cert requires tls.key") } diff --git a/internal/server/gateway.go b/internal/server/gateway.go new file mode 100644 index 0000000..66277e8 --- /dev/null +++ b/internal/server/gateway.go @@ -0,0 +1,77 @@ +package server + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "net/http" + "os" + "strings" + + "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" + ledgerv1 "github.com/rbaliyan/ledger/api/ledger/v1" + "github.com/rbaliyan/ledger/internal/config" + "github.com/rbaliyan/ledger/ledgerpb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" +) + +// newGatewayHandler dials the local gRPC server and returns an HTTP handler +// that translates REST requests to gRPC calls. +// +// The x-ledger-store and x-api-key HTTP headers are forwarded as gRPC metadata +// so the existing server-side auth interceptors handle authentication. +func newGatewayHandler(ctx context.Context, grpcAddr string, cfg *config.Config) (http.Handler, error) { + dialOpts, err := gatewayDialOpts(cfg) + if err != nil { + return nil, fmt.Errorf("gateway: dial options: %w", err) + } + + mux := runtime.NewServeMux( + runtime.WithIncomingHeaderMatcher(gatewayHeaderMatcher), + ) + if err := ledgerv1.RegisterLedgerServiceHandlerFromEndpoint(ctx, mux, grpcAddr, dialOpts); err != nil { + return nil, fmt.Errorf("gateway: register: %w", err) + } + return mux, nil +} + +// gatewayHeaderMatcher passes x-ledger-store and x-api-key through to gRPC +// metadata in addition to the default grpc-gateway headers. +func gatewayHeaderMatcher(key string) (string, bool) { + switch strings.ToLower(key) { + case ledgerpb.StoreMetadataHeader, ledgerpb.APIKeyMetadataHeader: + return key, true + } + return runtime.DefaultHeaderMatcher(key) +} + +// gatewayDialOpts builds the gRPC dial options for the gateway→gRPC loopback. +// When TLS is configured on the gRPC server the gateway uses matching credentials; +// otherwise it dials plaintext. +func gatewayDialOpts(cfg *config.Config) ([]grpc.DialOption, error) { + if cfg.TLS.Cert == "" { + return []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil + } + // Determine which certificate to trust: prefer an explicit CA, fall back to + // the server cert itself (self-signed case). + caPath := cfg.TLS.CA + if caPath == "" { + caPath = cfg.TLS.Cert + } + pem, err := os.ReadFile(caPath) // #nosec G304 -- path comes from config file, not user input + if err != nil { + return nil, fmt.Errorf("read CA cert: %w", err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(pem) { + return nil, fmt.Errorf("invalid CA certificate") + } + tc := credentials.NewTLS(&tls.Config{ + RootCAs: pool, + MinVersion: tls.VersionTLS12, + }) + return []grpc.DialOption{grpc.WithTransportCredentials(tc)}, nil +} diff --git a/internal/server/gateway_test.go b/internal/server/gateway_test.go new file mode 100644 index 0000000..6aa29d8 --- /dev/null +++ b/internal/server/gateway_test.go @@ -0,0 +1,163 @@ +package server_test + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "testing" + + "github.com/rbaliyan/ledger/internal/config" + "github.com/rbaliyan/ledger/internal/server" + "github.com/rbaliyan/ledger/ledgerpb" + "context" + + _ "modernc.org/sqlite" +) + +// newGatewayServer starts a server with both gRPC and HTTP gateway listeners. +func newGatewayServer(t *testing.T) (grpcAddr, httpAddr string) { + t.Helper() + cfg := &config.Config{ + Listen: "127.0.0.1:0", + HTTPListen: "127.0.0.1:0", + DB: config.DBConfig{ + Type: "sqlite", + SQLite: config.SQLiteConfig{Path: ":memory:"}, + }, + } + srv, err := server.New(t.Context(), cfg) + if err != nil { + t.Fatalf("server.New: %v", err) + } + t.Cleanup(func() { srv.Stop(context.Background()) }) + go func() { _ = srv.Serve() }() + return srv.Addr(), srv.HTTPAddr() +} + +func TestGatewayHealth(t *testing.T) { + _, httpAddr := newGatewayServer(t) + + resp, err := http.Get(fmt.Sprintf("http://%s/v1/health", httpAddr)) //nolint:noctx + if err != nil { + t.Fatalf("GET /v1/health: %v", err) + } + defer resp.Body.Close() //nolint:errcheck + if resp.StatusCode != http.StatusOK { + t.Errorf("expected 200, got %d", resp.StatusCode) + } + var body map[string]string + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + t.Fatalf("decode body: %v", err) + } + if body["status"] != "ok" { + t.Errorf("expected status=ok, got %q", body["status"]) + } +} + +func TestGatewayAppendRead(t *testing.T) { + _, httpAddr := newGatewayServer(t) + base := fmt.Sprintf("http://%s", httpAddr) + + payload, _ := json.Marshal(map[string]string{"event": "created"}) + body, _ := json.Marshal(map[string]any{ + "entries": []map[string]any{ + {"payload": payload}, + }, + }) + + req, _ := http.NewRequestWithContext(t.Context(), http.MethodPost, + base+"/v1/streams/order-1/entries", bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + req.Header.Set(ledgerpb.StoreMetadataHeader, "orders") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("POST entries: %v", err) + } + defer resp.Body.Close() //nolint:errcheck + if resp.StatusCode != http.StatusOK { + raw, _ := io.ReadAll(resp.Body) + t.Fatalf("expected 200, got %d: %s", resp.StatusCode, raw) + } + + var appendResp map[string]any + if err := json.NewDecoder(resp.Body).Decode(&appendResp); err != nil { + t.Fatalf("decode append response: %v", err) + } + ids, _ := appendResp["ids"].([]any) + if len(ids) != 1 { + t.Fatalf("expected 1 ID, got %v", appendResp) + } + + // Read back via GET. + req2, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, + base+"/v1/streams/order-1/entries", nil) + req2.Header.Set(ledgerpb.StoreMetadataHeader, "orders") + + resp2, err := http.DefaultClient.Do(req2) + if err != nil { + t.Fatalf("GET entries: %v", err) + } + defer resp2.Body.Close() //nolint:errcheck + if resp2.StatusCode != http.StatusOK { + raw, _ := io.ReadAll(resp2.Body) + t.Fatalf("expected 200, got %d: %s", resp2.StatusCode, raw) + } + var readResp map[string]any + if err := json.NewDecoder(resp2.Body).Decode(&readResp); err != nil { + t.Fatalf("decode read response: %v", err) + } + entries, _ := readResp["entries"].([]any) + if len(entries) != 1 { + t.Errorf("expected 1 entry, got %d", len(entries)) + } +} + +func TestGatewayAPIKeyAuth(t *testing.T) { + cfg := &config.Config{ + Listen: "127.0.0.1:0", + HTTPListen: "127.0.0.1:0", + APIKey: "secret", + DB: config.DBConfig{ + Type: "sqlite", + SQLite: config.SQLiteConfig{Path: ":memory:"}, + }, + } + srv, err := server.New(t.Context(), cfg) + if err != nil { + t.Fatalf("server.New: %v", err) + } + t.Cleanup(func() { srv.Stop(context.Background()) }) + go func() { _ = srv.Serve() }() + + base := fmt.Sprintf("http://%s", srv.HTTPAddr()) + + // No API key — should be rejected. + req, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, + base+"/v1/streams/s/entries:count", nil) + req.Header.Set(ledgerpb.StoreMetadataHeader, "test") + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("request: %v", err) + } + _ = resp.Body.Close() + if resp.StatusCode != http.StatusUnauthorized { + t.Errorf("expected 401, got %d", resp.StatusCode) + } + + // With correct API key — should succeed. + req2, _ := http.NewRequestWithContext(t.Context(), http.MethodGet, + base+"/v1/streams/s/entries:count", nil) + req2.Header.Set(ledgerpb.StoreMetadataHeader, "test") + req2.Header.Set(ledgerpb.APIKeyMetadataHeader, "secret") + resp2, err := http.DefaultClient.Do(req2) + if err != nil { + t.Fatalf("request with key: %v", err) + } + _ = resp2.Body.Close() + if resp2.StatusCode != http.StatusOK { + t.Errorf("expected 200, got %d", resp2.StatusCode) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index b30d6e5..2b417b6 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -9,6 +9,7 @@ import ( "io" "log/slog" "net" + "net/http" "os" "sync" @@ -26,6 +27,9 @@ type Server struct { mux *muxProvider closer io.Closer + httpServer *http.Server // nil when HTTP gateway is disabled + httpListener net.Listener // nil when HTTP gateway is disabled + mu sync.Mutex // protects hooks, hookCancel, and stopped hooks []*hookRunner hookCancel context.CancelFunc @@ -72,6 +76,26 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { hookCancel: hookCancel, } + if cfg.HTTPListen != "" { + httpHandler, err := newGatewayHandler(ctx, ln.Addr().String(), cfg) + if err != nil { + hookCancel() + _ = ln.Close() + mux.Close(ctx) + return nil, fmt.Errorf("server: gateway: %w", err) + } + httpLn, err := net.Listen("tcp", cfg.HTTPListen) + if err != nil { + hookCancel() + _ = ln.Close() + mux.Close(ctx) + return nil, fmt.Errorf("server: http listen %s: %w", cfg.HTTPListen, err) + } + srv.httpServer = &http.Server{Handler: httpHandler} + srv.httpListener = httpLn + slog.Info("ledger HTTP gateway listening", "addr", httpLn.Addr().String()) + } + for _, hcfg := range cfg.Hooks { h, err := newHookRunner(hcfg, mux) if err != nil { @@ -87,9 +111,17 @@ func New(ctx context.Context, cfg *config.Config) (*Server, error) { return srv, nil } -// Addr returns the address the server is listening on. +// Addr returns the gRPC address the server is listening on. func (s *Server) Addr() string { return s.listener.Addr().String() } +// HTTPAddr returns the HTTP gateway address, or empty string if disabled. +func (s *Server) HTTPAddr() string { + if s.httpListener == nil { + return "" + } + return s.httpListener.Addr().String() +} + // ReloadHooks cancels the current hook set, waits for them to stop, then // starts a new set from cfg.Hooks. The gRPC listener and backend connection // are not affected. No-ops if Stop has already been called. @@ -129,8 +161,15 @@ func (s *Server) ReloadHooks(cfg *config.Config) { slog.Info("hooks reloaded", "count", len(hooks)) } -// Serve blocks until the server stops. +// Serve starts the HTTP gateway (if configured) and blocks on the gRPC server. func (s *Server) Serve() error { + if s.httpServer != nil { + go func() { + if err := s.httpServer.Serve(s.httpListener); err != nil && err != http.ErrServerClosed { + slog.Warn("HTTP gateway stopped", "err", err) + } + }() + } return s.grpc.Serve(s.listener) } @@ -152,6 +191,11 @@ func (s *Server) Stop(ctx context.Context) { for _, h := range hooks { h.Wait() } + if s.httpServer != nil { + if err := s.httpServer.Shutdown(ctx); err != nil { + slog.Warn("HTTP gateway shutdown error", "err", err) + } + } s.grpc.GracefulStop() s.mux.Close(ctx) if s.closer != nil { diff --git a/proto/ledger/v1/ledger.proto b/proto/ledger/v1/ledger.proto index 121d9b2..1f6591c 100644 --- a/proto/ledger/v1/ledger.proto +++ b/proto/ledger/v1/ledger.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package ledger.v1; +import "google/api/annotations.proto"; import "google/protobuf/timestamp.proto"; option go_package = "github.com/rbaliyan/ledger/api/ledger/v1;ledgerv1"; @@ -16,47 +17,99 @@ service LedgerService { // Append adds entries to the named stream. // Entries whose DedupKey already exists in the stream are silently skipped. // Returns the IDs assigned to newly written entries (skipped entries are omitted). - rpc Append(AppendRequest) returns (AppendResponse); + rpc Append(AppendRequest) returns (AppendResponse) { + option (google.api.http) = { + post: "/v1/streams/{stream}/entries" + body: "*" + }; + } // Read returns entries from the named stream, ordered by ID. // An empty stream returns an empty list (not an error). - rpc Read(ReadRequest) returns (ReadResponse); + // Query params: options.after, options.limit, options.desc, options.order_key, options.tag, options.all_tags. + rpc Read(ReadRequest) returns (ReadResponse) { + option (google.api.http) = { + get: "/v1/streams/{stream}/entries" + }; + } // Count returns the total number of entries in the named stream. - rpc Count(CountRequest) returns (CountResponse); + rpc Count(CountRequest) returns (CountResponse) { + option (google.api.http) = { + get: "/v1/streams/{stream}/entries:count" + }; + } // SetTags replaces all tags on an existing entry. // Returns NOT_FOUND if the entry does not exist. - rpc SetTags(SetTagsRequest) returns (SetTagsResponse); + rpc SetTags(SetTagsRequest) returns (SetTagsResponse) { + option (google.api.http) = { + put: "/v1/streams/{stream}/entries/{id}/tags" + body: "*" + }; + } // SetAnnotations merges key-value annotations onto an existing entry. // Keys present in "set" are upserted; keys present in "delete" are removed. // Returns NOT_FOUND if the entry does not exist. - rpc SetAnnotations(SetAnnotationsRequest) returns (SetAnnotationsResponse); + rpc SetAnnotations(SetAnnotationsRequest) returns (SetAnnotationsResponse) { + option (google.api.http) = { + patch: "/v1/streams/{stream}/entries/{id}/annotations" + body: "*" + }; + } // Trim deletes entries whose ID is less than or equal to before_id. // Returns the number of entries deleted. - rpc Trim(TrimRequest) returns (TrimResponse); + rpc Trim(TrimRequest) returns (TrimResponse) { + option (google.api.http) = { + post: "/v1/streams/{stream}/entries:trim" + body: "*" + }; + } // ListStreamIDs returns distinct stream IDs that have at least one entry. // Streams that have been fully trimmed are not returned. // Results are ascending by stream ID and cursor-paginated via after / limit. - rpc ListStreamIDs(ListStreamIDsRequest) returns (ListStreamIDsResponse); + // Query params: after, limit. + rpc ListStreamIDs(ListStreamIDsRequest) returns (ListStreamIDsResponse) { + option (google.api.http) = { + get: "/v1/streams" + }; + } // RenameStream changes the human-readable name of a stream without touching // its entries. Returns NOT_FOUND if the current name does not exist. - rpc RenameStream(RenameStreamRequest) returns (RenameStreamResponse); + rpc RenameStream(RenameStreamRequest) returns (RenameStreamResponse) { + option (google.api.http) = { + post: "/v1/streams/{name}:rename" + body: "*" + }; + } // Stat returns metrics for a stream, including entry count and first/last entry IDs. - rpc Stat(StatRequest) returns (StatResponse); + rpc Stat(StatRequest) returns (StatResponse) { + option (google.api.http) = { + get: "/v1/streams/{stream}:stat" + }; + } // Search performs a substring / full-text search on entry payloads. // Returns UNIMPLEMENTED if the backend does not support search. - rpc Search(SearchRequest) returns (SearchResponse); + rpc Search(SearchRequest) returns (SearchResponse) { + option (google.api.http) = { + post: "/v1/streams/{stream}/entries:search" + body: "*" + }; + } // Health reports backend connectivity. Returns status "ok" on success, // or a description of the problem on failure. - rpc Health(HealthRequest) returns (HealthResponse); + rpc Health(HealthRequest) returns (HealthResponse) { + option (google.api.http) = { + get: "/v1/health" + }; + } } // ─── shared types ────────────────────────────────────────────────────────────