Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 60 additions & 44 deletions api/ledger/v1/ledger.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions clickhouse/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ func (s *Store) Read(ctx context.Context, stream string, opts ...ledger.ReadOpti
sb.WriteString(" AND has(JSONExtractArrayRaw(tags), ?)")
args = append(args, fmt.Sprintf(`"%s"`, tag))
}
for _, kv := range o.MetadataFilters() {
sb.WriteString(" AND JSONExtractString(metadata, ?) = ?")
args = append(args, kv.Key, kv.Value)
}

if o.Order() == ledger.Descending {
sb.WriteString(" ORDER BY id DESC")
Expand Down
3 changes: 3 additions & 0 deletions ledgerpb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ func (o ReadOptions) commonOpts() []ledger.ReadOption {
if len(o.AllTags) > 0 {
opts = append(opts, ledger.WithAllTags(o.AllTags...))
}
for k, v := range o.MetadataFilters {
opts = append(opts, ledger.WithMetadataKey(k, v))
}
return opts
}

Expand Down
13 changes: 7 additions & 6 deletions ledgerpb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ const (
// It mirrors ledger.ReadOptions but uses plain Go values instead of functional
// options so adapters can inspect the cursor without type assertions.
type ReadOptions struct {
After string // cursor: only entries with ID > After are returned; "" means start
Limit int // 0 means backend default (100)
Desc bool // newest-first when true
OrderKey string // filter by order_key field
Tag string // filter entries that carry this single tag
AllTags []string // filter entries that carry ALL of these tags
After string // cursor: only entries with ID > After are returned; "" means start
Limit int // 0 means backend default (100)
Desc bool // newest-first when true
OrderKey string // filter by order_key field
Tag string // filter entries that carry this single tag
AllTags []string // filter entries that carry ALL of these tags
MetadataFilters map[string]string // filter entries whose metadata contains all key-value pairs (ANDed)
}

// InputEntry is a single entry to be appended to a stream.
Expand Down
13 changes: 7 additions & 6 deletions ledgerpb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,13 @@ func readOptionsFromProto(p *ledgerv1.ReadOptions) ReadOptions {
return ReadOptions{}
}
return ReadOptions{
After: p.After,
Limit: int(p.Limit), //nolint:gosec // proto int64 fits in int
Desc: p.Desc,
OrderKey: p.OrderKey,
Tag: p.Tag,
AllTags: p.AllTags,
After: p.After,
Limit: int(p.Limit), //nolint:gosec // proto int64 fits in int
Desc: p.Desc,
OrderKey: p.OrderKey,
Tag: p.Tag,
AllTags: p.AllTags,
MetadataFilters: p.MetadataFilters,
}
}

Expand Down
3 changes: 3 additions & 0 deletions mongodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,9 @@ func (s *Store) Read(ctx context.Context, stream string, opts ...ledger.ReadOpti
if allTags := o.AllTags(); len(allTags) > 0 {
filter = append(filter, bson.E{Key: "tags", Value: bson.D{{Key: "$all", Value: allTags}}})
}
for _, kv := range o.MetadataFilters() {
filter = append(filter, bson.E{Key: "metadata." + kv.Key, Value: kv.Value})
}

sortDir := 1
if o.Order() == ledger.Descending {
Expand Down
4 changes: 4 additions & 0 deletions proto/ledger/v1/ledger.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ message ReadOptions {

// all_tags filters entries that carry ALL of the specified tags.
repeated string all_tags = 6;

// metadata_filters filters entries whose metadata map contains all of the
// given key-value pairs. Pairs are ANDed together.
map<string, string> metadata_filters = 7;
}

// ─── Append ──────────────────────────────────────────────────────────────────
Expand Down
4 changes: 4 additions & 0 deletions sqlite/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ func (s *Store) Read(ctx context.Context, stream string, opts ...ledger.ReadOpti
clauses = append(clauses, "EXISTS (SELECT 1 FROM json_each(tags) WHERE json_each.value = ?)")
args = append(args, tag)
}
for _, kv := range o.MetadataFilters() {
clauses = append(clauses, "EXISTS (SELECT 1 FROM json_each(metadata) WHERE json_each.key = ? AND json_each.value = ?)")
args = append(args, kv.Key, kv.Value)
}

dir := "ASC"
if o.Order() == ledger.Descending {
Expand Down
5 changes: 1 addition & 4 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,7 @@ func WithAllTags(tags ...string) ReadOption {

// WithMetadataKey returns a ReadOption that filters entries whose metadata map
// contains the given key with the given value. Multiple calls are ANDed.
//
// Only the PostgreSQL backend implements this filter. SQLite, MongoDB, and
// ClickHouse backends silently ignore it (no error is returned by the library
// layer; backend-specific checking is the caller's responsibility).
// All backends (SQLite, PostgreSQL, MongoDB, ClickHouse) support this filter.
func WithMetadataKey(key, value string) ReadOption {
return func(o *ReadOptions) {
o.metadataFilters = append(o.metadataFilters, metadataKV{Key: key, Value: value})
Expand Down
52 changes: 52 additions & 0 deletions storetest/storetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,58 @@ func RunStoreTests[I comparable, P any](t *testing.T, store ledger.Store[I, P],
}
})

t.Run("WithMetadataKeyFilter", func(t *testing.T) {
for _, meta := range []map[string]string{
{"source": "producer-a", "env": "prod"},
{"source": "producer-b", "env": "prod"},
{"source": "producer-a", "env": "staging"},
} {
if _, err := store.Append(ctx, "test-meta-filter", ledger.RawEntry[P]{
Payload: cfg.SamplePayload,
Metadata: meta,
}); err != nil {
t.Fatalf("append: %v", err)
}
}

entries, err := store.Read(ctx, "test-meta-filter", ledger.WithMetadataKey("source", "producer-a"))
if err != nil {
t.Fatalf("WithMetadataKey: %v", err)
}
if len(entries) != 2 {
t.Errorf("WithMetadataKey(source=producer-a): got %d, want 2", len(entries))
}

entries, err = store.Read(ctx, "test-meta-filter", ledger.WithMetadataKey("env", "prod"))
if err != nil {
t.Fatalf("WithMetadataKey env: %v", err)
}
if len(entries) != 2 {
t.Errorf("WithMetadataKey(env=prod): got %d, want 2", len(entries))
}

// AND: both conditions, only one entry matches
entries, err = store.Read(ctx, "test-meta-filter",
ledger.WithMetadataKey("source", "producer-a"),
ledger.WithMetadataKey("env", "prod"),
)
if err != nil {
t.Fatalf("WithMetadataKey AND: %v", err)
}
if len(entries) != 1 {
t.Errorf("WithMetadataKey AND: got %d, want 1", len(entries))
}

// Non-matching value returns empty
entries, err = store.Read(ctx, "test-meta-filter", ledger.WithMetadataKey("source", "producer-c"))
if err != nil {
t.Fatalf("WithMetadataKey no match: %v", err)
}
if len(entries) != 0 {
t.Errorf("WithMetadataKey(source=producer-c): got %d, want 0", len(entries))
}
})

// Stream-name prefix chosen so these streams sort after all other test streams
// in this suite, allowing ListStreamIDs tests to isolate their results via cursor.
const listPrefix = "zzz-list-"
Expand Down
Loading