Skip to content

Commit

Permalink
watch: support -filter for consul watch: checks, services, nodes, ser…
Browse files Browse the repository at this point in the history
…vice (#17780)

* watch: support -filter for watch checks

* Add filter for watch nodes, services, and service
- unit test added
- Add changelog
- update doc
  • Loading branch information
huikang committed Jun 23, 2023
1 parent b782f2e commit f16c5d8
Show file tree
Hide file tree
Showing 6 changed files with 575 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/17780.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
cli: `consul watch` command uses `-filter` expression to filter response from checks, services, nodes, and service.
```
6 changes: 6 additions & 0 deletions agent/consul/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,5 +1767,11 @@ func TestHealth_RPC_Filter(t *testing.T) {
out = new(structs.IndexedHealthChecks)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
require.Len(t, out.HealthChecks, 1)

args.State = api.HealthAny
args.Filter = "connect in ServiceTags and v2 in ServiceTags"
out = new(structs.IndexedHealthChecks)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
require.Len(t, out.HealthChecks, 1)
})
}
29 changes: 28 additions & 1 deletion api/watch/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
// servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}

fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
services, meta, err := catalog.Services(&opts)
if err != nil {
Expand All @@ -112,13 +119,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
// nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}

fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
Expand All @@ -132,9 +146,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
// serviceWatch is used to watch a specific service for changes
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}

var (
service string
Expand All @@ -158,6 +176,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
if err != nil {
Expand All @@ -175,13 +196,16 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
return nil, err
}

var service, state string
var service, state, filter string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if err := assignValue(params, "state", &state); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state")
}
Expand All @@ -196,6 +220,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
if filter != "" {
opts.Filter = filter
}
if state != "" {
checks, meta, err = health.State(state, &opts)
} else {
Expand Down
Loading

0 comments on commit f16c5d8

Please sign in to comment.