From e6a78bc3f7104a43d3237b71725c0e082f747836 Mon Sep 17 00:00:00 2001 From: Rajeev Baliyan <20521486+rbaliyan@users.noreply.github.com> Date: Fri, 24 Apr 2026 12:37:20 +0530 Subject: [PATCH] feat: implement WithMetadataKey filter across all backends SQLite uses json_each to match key-value pairs in the metadata column. MongoDB appends dot-notation filters on the metadata subdocument. ClickHouse uses JSONExtractString for key lookup. The proto ReadOptions message gains a metadata_filters map field (field 7). The ledgerpb adapter translates map entries to WithMetadataKey calls so the gRPC/HTTP gateway surfaces the same filter. Conformance test added to storetest covering single-key filter, AND of two keys, and non-matching value. --- api/ledger/v1/ledger.pb.go | 104 ++++++++++++++++++++--------------- clickhouse/store.go | 4 ++ ledgerpb/adapter.go | 3 + ledgerpb/provider.go | 13 +++-- ledgerpb/server.go | 13 +++-- mongodb/store.go | 3 + proto/ledger/v1/ledger.proto | 4 ++ sqlite/store.go | 4 ++ store.go | 5 +- storetest/storetest.go | 52 ++++++++++++++++++ 10 files changed, 145 insertions(+), 60 deletions(-) diff --git a/api/ledger/v1/ledger.pb.go b/api/ledger/v1/ledger.pb.go index 647d027..7298a0d 100644 --- a/api/ledger/v1/ledger.pb.go +++ b/api/ledger/v1/ledger.pb.go @@ -271,9 +271,12 @@ type ReadOptions struct { // tag filters entries that carry this single tag. Tag string `protobuf:"bytes,5,opt,name=tag,proto3" json:"tag,omitempty"` // all_tags filters entries that carry ALL of the specified tags. - AllTags []string `protobuf:"bytes,6,rep,name=all_tags,json=allTags,proto3" json:"all_tags,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + AllTags []string `protobuf:"bytes,6,rep,name=all_tags,json=allTags,proto3" json:"all_tags,omitempty"` + // metadata_filters filters entries whose metadata map contains all of the + // given key-value pairs. Pairs are ANDed together. + MetadataFilters map[string]string `protobuf:"bytes,7,rep,name=metadata_filters,json=metadataFilters,proto3" json:"metadata_filters,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ReadOptions) Reset() { @@ -348,6 +351,13 @@ func (x *ReadOptions) GetAllTags() []string { return nil } +func (x *ReadOptions) GetMetadataFilters() map[string]string { + if x != nil { + return x.MetadataFilters + } + return nil +} + type AppendRequest struct { state protoimpl.MessageState `protogen:"open.v1"` // stream is the stream to append to. Must not be empty. @@ -1466,14 +1476,18 @@ const file_ledger_v1_ledger_proto_rawDesc = "" + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\x1a>\n" + "\x10AnnotationsEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\x97\x01\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"\xb3\x02\n" + "\vReadOptions\x12\x14\n" + "\x05after\x18\x01 \x01(\tR\x05after\x12\x14\n" + "\x05limit\x18\x02 \x01(\x03R\x05limit\x12\x12\n" + "\x04desc\x18\x03 \x01(\bR\x04desc\x12\x1b\n" + "\torder_key\x18\x04 \x01(\tR\borderKey\x12\x10\n" + "\x03tag\x18\x05 \x01(\tR\x03tag\x12\x19\n" + - "\ball_tags\x18\x06 \x03(\tR\aallTags\"X\n" + + "\ball_tags\x18\x06 \x03(\tR\aallTags\x12V\n" + + "\x10metadata_filters\x18\a \x03(\v2+.ledger.v1.ReadOptions.MetadataFiltersEntryR\x0fmetadataFilters\x1aB\n" + + "\x14MetadataFiltersEntry\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" + + "\x05value\x18\x02 \x01(\tR\x05value:\x028\x01\"X\n" + "\rAppendRequest\x12\x16\n" + "\x06stream\x18\x01 \x01(\tR\x06stream\x12/\n" + "\aentries\x18\x02 \x03(\v2\x15.ledger.v1.EntryInputR\aentries\"\"\n" + @@ -1559,7 +1573,7 @@ func file_ledger_v1_ledger_proto_rawDescGZIP() []byte { return file_ledger_v1_ledger_proto_rawDescData } -var file_ledger_v1_ledger_proto_msgTypes = make([]protoimpl.MessageInfo, 29) +var file_ledger_v1_ledger_proto_msgTypes = make([]protoimpl.MessageInfo, 30) var file_ledger_v1_ledger_proto_goTypes = []any{ (*EntryInput)(nil), // 0: ledger.v1.EntryInput (*Entry)(nil), // 1: ledger.v1.Entry @@ -1589,48 +1603,50 @@ var file_ledger_v1_ledger_proto_goTypes = []any{ nil, // 25: ledger.v1.EntryInput.MetadataEntry nil, // 26: ledger.v1.Entry.MetadataEntry nil, // 27: ledger.v1.Entry.AnnotationsEntry - nil, // 28: ledger.v1.SetAnnotationsRequest.SetEntry - (*timestamppb.Timestamp)(nil), // 29: google.protobuf.Timestamp + nil, // 28: ledger.v1.ReadOptions.MetadataFiltersEntry + nil, // 29: ledger.v1.SetAnnotationsRequest.SetEntry + (*timestamppb.Timestamp)(nil), // 30: google.protobuf.Timestamp } var file_ledger_v1_ledger_proto_depIdxs = []int32{ 25, // 0: ledger.v1.EntryInput.metadata:type_name -> ledger.v1.EntryInput.MetadataEntry 26, // 1: ledger.v1.Entry.metadata:type_name -> ledger.v1.Entry.MetadataEntry 27, // 2: ledger.v1.Entry.annotations:type_name -> ledger.v1.Entry.AnnotationsEntry - 29, // 3: ledger.v1.Entry.created_at:type_name -> google.protobuf.Timestamp - 29, // 4: ledger.v1.Entry.updated_at:type_name -> google.protobuf.Timestamp - 0, // 5: ledger.v1.AppendRequest.entries:type_name -> ledger.v1.EntryInput - 2, // 6: ledger.v1.ReadRequest.options:type_name -> ledger.v1.ReadOptions - 1, // 7: ledger.v1.ReadResponse.entries:type_name -> ledger.v1.Entry - 28, // 8: ledger.v1.SetAnnotationsRequest.set:type_name -> ledger.v1.SetAnnotationsRequest.SetEntry - 2, // 9: ledger.v1.SearchRequest.options:type_name -> ledger.v1.ReadOptions - 1, // 10: ledger.v1.SearchResponse.entries:type_name -> ledger.v1.Entry - 3, // 11: ledger.v1.LedgerService.Append:input_type -> ledger.v1.AppendRequest - 5, // 12: ledger.v1.LedgerService.Read:input_type -> ledger.v1.ReadRequest - 7, // 13: ledger.v1.LedgerService.Count:input_type -> ledger.v1.CountRequest - 9, // 14: ledger.v1.LedgerService.SetTags:input_type -> ledger.v1.SetTagsRequest - 11, // 15: ledger.v1.LedgerService.SetAnnotations:input_type -> ledger.v1.SetAnnotationsRequest - 13, // 16: ledger.v1.LedgerService.Trim:input_type -> ledger.v1.TrimRequest - 15, // 17: ledger.v1.LedgerService.ListStreamIDs:input_type -> ledger.v1.ListStreamIDsRequest - 17, // 18: ledger.v1.LedgerService.RenameStream:input_type -> ledger.v1.RenameStreamRequest - 19, // 19: ledger.v1.LedgerService.Stat:input_type -> ledger.v1.StatRequest - 21, // 20: ledger.v1.LedgerService.Search:input_type -> ledger.v1.SearchRequest - 23, // 21: ledger.v1.LedgerService.Health:input_type -> ledger.v1.HealthRequest - 4, // 22: ledger.v1.LedgerService.Append:output_type -> ledger.v1.AppendResponse - 6, // 23: ledger.v1.LedgerService.Read:output_type -> ledger.v1.ReadResponse - 8, // 24: ledger.v1.LedgerService.Count:output_type -> ledger.v1.CountResponse - 10, // 25: ledger.v1.LedgerService.SetTags:output_type -> ledger.v1.SetTagsResponse - 12, // 26: ledger.v1.LedgerService.SetAnnotations:output_type -> ledger.v1.SetAnnotationsResponse - 14, // 27: ledger.v1.LedgerService.Trim:output_type -> ledger.v1.TrimResponse - 16, // 28: ledger.v1.LedgerService.ListStreamIDs:output_type -> ledger.v1.ListStreamIDsResponse - 18, // 29: ledger.v1.LedgerService.RenameStream:output_type -> ledger.v1.RenameStreamResponse - 20, // 30: ledger.v1.LedgerService.Stat:output_type -> ledger.v1.StatResponse - 22, // 31: ledger.v1.LedgerService.Search:output_type -> ledger.v1.SearchResponse - 24, // 32: ledger.v1.LedgerService.Health:output_type -> ledger.v1.HealthResponse - 22, // [22:33] is the sub-list for method output_type - 11, // [11:22] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 30, // 3: ledger.v1.Entry.created_at:type_name -> google.protobuf.Timestamp + 30, // 4: ledger.v1.Entry.updated_at:type_name -> google.protobuf.Timestamp + 28, // 5: ledger.v1.ReadOptions.metadata_filters:type_name -> ledger.v1.ReadOptions.MetadataFiltersEntry + 0, // 6: ledger.v1.AppendRequest.entries:type_name -> ledger.v1.EntryInput + 2, // 7: ledger.v1.ReadRequest.options:type_name -> ledger.v1.ReadOptions + 1, // 8: ledger.v1.ReadResponse.entries:type_name -> ledger.v1.Entry + 29, // 9: ledger.v1.SetAnnotationsRequest.set:type_name -> ledger.v1.SetAnnotationsRequest.SetEntry + 2, // 10: ledger.v1.SearchRequest.options:type_name -> ledger.v1.ReadOptions + 1, // 11: ledger.v1.SearchResponse.entries:type_name -> ledger.v1.Entry + 3, // 12: ledger.v1.LedgerService.Append:input_type -> ledger.v1.AppendRequest + 5, // 13: ledger.v1.LedgerService.Read:input_type -> ledger.v1.ReadRequest + 7, // 14: ledger.v1.LedgerService.Count:input_type -> ledger.v1.CountRequest + 9, // 15: ledger.v1.LedgerService.SetTags:input_type -> ledger.v1.SetTagsRequest + 11, // 16: ledger.v1.LedgerService.SetAnnotations:input_type -> ledger.v1.SetAnnotationsRequest + 13, // 17: ledger.v1.LedgerService.Trim:input_type -> ledger.v1.TrimRequest + 15, // 18: ledger.v1.LedgerService.ListStreamIDs:input_type -> ledger.v1.ListStreamIDsRequest + 17, // 19: ledger.v1.LedgerService.RenameStream:input_type -> ledger.v1.RenameStreamRequest + 19, // 20: ledger.v1.LedgerService.Stat:input_type -> ledger.v1.StatRequest + 21, // 21: ledger.v1.LedgerService.Search:input_type -> ledger.v1.SearchRequest + 23, // 22: ledger.v1.LedgerService.Health:input_type -> ledger.v1.HealthRequest + 4, // 23: ledger.v1.LedgerService.Append:output_type -> ledger.v1.AppendResponse + 6, // 24: ledger.v1.LedgerService.Read:output_type -> ledger.v1.ReadResponse + 8, // 25: ledger.v1.LedgerService.Count:output_type -> ledger.v1.CountResponse + 10, // 26: ledger.v1.LedgerService.SetTags:output_type -> ledger.v1.SetTagsResponse + 12, // 27: ledger.v1.LedgerService.SetAnnotations:output_type -> ledger.v1.SetAnnotationsResponse + 14, // 28: ledger.v1.LedgerService.Trim:output_type -> ledger.v1.TrimResponse + 16, // 29: ledger.v1.LedgerService.ListStreamIDs:output_type -> ledger.v1.ListStreamIDsResponse + 18, // 30: ledger.v1.LedgerService.RenameStream:output_type -> ledger.v1.RenameStreamResponse + 20, // 31: ledger.v1.LedgerService.Stat:output_type -> ledger.v1.StatResponse + 22, // 32: ledger.v1.LedgerService.Search:output_type -> ledger.v1.SearchResponse + 24, // 33: ledger.v1.LedgerService.Health:output_type -> ledger.v1.HealthResponse + 23, // [23:34] is the sub-list for method output_type + 12, // [12:23] is the sub-list for method input_type + 12, // [12:12] is the sub-list for extension type_name + 12, // [12:12] is the sub-list for extension extendee + 0, // [0:12] is the sub-list for field type_name } func init() { file_ledger_v1_ledger_proto_init() } @@ -1644,7 +1660,7 @@ func file_ledger_v1_ledger_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_ledger_v1_ledger_proto_rawDesc), len(file_ledger_v1_ledger_proto_rawDesc)), NumEnums: 0, - NumMessages: 29, + NumMessages: 30, NumExtensions: 0, NumServices: 1, }, diff --git a/clickhouse/store.go b/clickhouse/store.go index 5778b0d..0a463a2 100644 --- a/clickhouse/store.go +++ b/clickhouse/store.go @@ -236,6 +236,10 @@ func (s *Store) Read(ctx context.Context, stream string, opts ...ledger.ReadOpti sb.WriteString(" AND has(JSONExtractArrayRaw(tags), ?)") args = append(args, fmt.Sprintf(`"%s"`, tag)) } + for _, kv := range o.MetadataFilters() { + sb.WriteString(" AND JSONExtractString(metadata, ?) = ?") + args = append(args, kv.Key, kv.Value) + } if o.Order() == ledger.Descending { sb.WriteString(" ORDER BY id DESC") diff --git a/ledgerpb/adapter.go b/ledgerpb/adapter.go index 1e40de0..7f3ba94 100644 --- a/ledgerpb/adapter.go +++ b/ledgerpb/adapter.go @@ -308,6 +308,9 @@ func (o ReadOptions) commonOpts() []ledger.ReadOption { if len(o.AllTags) > 0 { opts = append(opts, ledger.WithAllTags(o.AllTags...)) } + for k, v := range o.MetadataFilters { + opts = append(opts, ledger.WithMetadataKey(k, v)) + } return opts } diff --git a/ledgerpb/provider.go b/ledgerpb/provider.go index 947507a..6c7b614 100644 --- a/ledgerpb/provider.go +++ b/ledgerpb/provider.go @@ -23,12 +23,13 @@ const ( // It mirrors ledger.ReadOptions but uses plain Go values instead of functional // options so adapters can inspect the cursor without type assertions. type ReadOptions struct { - After string // cursor: only entries with ID > After are returned; "" means start - Limit int // 0 means backend default (100) - Desc bool // newest-first when true - OrderKey string // filter by order_key field - Tag string // filter entries that carry this single tag - AllTags []string // filter entries that carry ALL of these tags + After string // cursor: only entries with ID > After are returned; "" means start + Limit int // 0 means backend default (100) + Desc bool // newest-first when true + OrderKey string // filter by order_key field + Tag string // filter entries that carry this single tag + AllTags []string // filter entries that carry ALL of these tags + MetadataFilters map[string]string // filter entries whose metadata contains all key-value pairs (ANDed) } // InputEntry is a single entry to be appended to a stream. diff --git a/ledgerpb/server.go b/ledgerpb/server.go index 6ad39a0..0506c7d 100644 --- a/ledgerpb/server.go +++ b/ledgerpb/server.go @@ -244,12 +244,13 @@ func readOptionsFromProto(p *ledgerv1.ReadOptions) ReadOptions { return ReadOptions{} } return ReadOptions{ - After: p.After, - Limit: int(p.Limit), //nolint:gosec // proto int64 fits in int - Desc: p.Desc, - OrderKey: p.OrderKey, - Tag: p.Tag, - AllTags: p.AllTags, + After: p.After, + Limit: int(p.Limit), //nolint:gosec // proto int64 fits in int + Desc: p.Desc, + OrderKey: p.OrderKey, + Tag: p.Tag, + AllTags: p.AllTags, + MetadataFilters: p.MetadataFilters, } } diff --git a/mongodb/store.go b/mongodb/store.go index 818e0a1..a898a45 100644 --- a/mongodb/store.go +++ b/mongodb/store.go @@ -332,6 +332,9 @@ func (s *Store) Read(ctx context.Context, stream string, opts ...ledger.ReadOpti if allTags := o.AllTags(); len(allTags) > 0 { filter = append(filter, bson.E{Key: "tags", Value: bson.D{{Key: "$all", Value: allTags}}}) } + for _, kv := range o.MetadataFilters() { + filter = append(filter, bson.E{Key: "metadata." + kv.Key, Value: kv.Value}) + } sortDir := 1 if o.Order() == ledger.Descending { diff --git a/proto/ledger/v1/ledger.proto b/proto/ledger/v1/ledger.proto index 1f6591c..55bcc03 100644 --- a/proto/ledger/v1/ledger.proto +++ b/proto/ledger/v1/ledger.proto @@ -196,6 +196,10 @@ message ReadOptions { // all_tags filters entries that carry ALL of the specified tags. repeated string all_tags = 6; + + // metadata_filters filters entries whose metadata map contains all of the + // given key-value pairs. Pairs are ANDed together. + map metadata_filters = 7; } // ─── Append ────────────────────────────────────────────────────────────────── diff --git a/sqlite/store.go b/sqlite/store.go index da69237..be038c1 100644 --- a/sqlite/store.go +++ b/sqlite/store.go @@ -381,6 +381,10 @@ func (s *Store) Read(ctx context.Context, stream string, opts ...ledger.ReadOpti clauses = append(clauses, "EXISTS (SELECT 1 FROM json_each(tags) WHERE json_each.value = ?)") args = append(args, tag) } + for _, kv := range o.MetadataFilters() { + clauses = append(clauses, "EXISTS (SELECT 1 FROM json_each(metadata) WHERE json_each.key = ? AND json_each.value = ?)") + args = append(args, kv.Key, kv.Value) + } dir := "ASC" if o.Order() == ledger.Descending { diff --git a/store.go b/store.go index 1f21e0f..33449cc 100644 --- a/store.go +++ b/store.go @@ -281,10 +281,7 @@ func WithAllTags(tags ...string) ReadOption { // WithMetadataKey returns a ReadOption that filters entries whose metadata map // contains the given key with the given value. Multiple calls are ANDed. -// -// Only the PostgreSQL backend implements this filter. SQLite, MongoDB, and -// ClickHouse backends silently ignore it (no error is returned by the library -// layer; backend-specific checking is the caller's responsibility). +// All backends (SQLite, PostgreSQL, MongoDB, ClickHouse) support this filter. func WithMetadataKey(key, value string) ReadOption { return func(o *ReadOptions) { o.metadataFilters = append(o.metadataFilters, metadataKV{Key: key, Value: value}) diff --git a/storetest/storetest.go b/storetest/storetest.go index 5d5918a..66d157d 100644 --- a/storetest/storetest.go +++ b/storetest/storetest.go @@ -418,6 +418,58 @@ func RunStoreTests[I comparable, P any](t *testing.T, store ledger.Store[I, P], } }) + t.Run("WithMetadataKeyFilter", func(t *testing.T) { + for _, meta := range []map[string]string{ + {"source": "producer-a", "env": "prod"}, + {"source": "producer-b", "env": "prod"}, + {"source": "producer-a", "env": "staging"}, + } { + if _, err := store.Append(ctx, "test-meta-filter", ledger.RawEntry[P]{ + Payload: cfg.SamplePayload, + Metadata: meta, + }); err != nil { + t.Fatalf("append: %v", err) + } + } + + entries, err := store.Read(ctx, "test-meta-filter", ledger.WithMetadataKey("source", "producer-a")) + if err != nil { + t.Fatalf("WithMetadataKey: %v", err) + } + if len(entries) != 2 { + t.Errorf("WithMetadataKey(source=producer-a): got %d, want 2", len(entries)) + } + + entries, err = store.Read(ctx, "test-meta-filter", ledger.WithMetadataKey("env", "prod")) + if err != nil { + t.Fatalf("WithMetadataKey env: %v", err) + } + if len(entries) != 2 { + t.Errorf("WithMetadataKey(env=prod): got %d, want 2", len(entries)) + } + + // AND: both conditions, only one entry matches + entries, err = store.Read(ctx, "test-meta-filter", + ledger.WithMetadataKey("source", "producer-a"), + ledger.WithMetadataKey("env", "prod"), + ) + if err != nil { + t.Fatalf("WithMetadataKey AND: %v", err) + } + if len(entries) != 1 { + t.Errorf("WithMetadataKey AND: got %d, want 1", len(entries)) + } + + // Non-matching value returns empty + entries, err = store.Read(ctx, "test-meta-filter", ledger.WithMetadataKey("source", "producer-c")) + if err != nil { + t.Fatalf("WithMetadataKey no match: %v", err) + } + if len(entries) != 0 { + t.Errorf("WithMetadataKey(source=producer-c): got %d, want 0", len(entries)) + } + }) + // Stream-name prefix chosen so these streams sort after all other test streams // in this suite, allowing ListStreamIDs tests to isolate their results via cursor. const listPrefix = "zzz-list-"