Skip to content

Commit f85f02a

Browse files
Add endpoint to filter stat vars by places (#1585)
Refactors the `SearchStatvar` function to separate logic for filtering stat vars by places into its own function/endpoint. This change is required to enable Vertex AI source filtering, which will be calling the new endpoint to use the filtering logic after performing the stat var search independently. I tested the endpoint locally to ensure that the new endpoint works independently, and that the old `SearchStatVar` function still works as intended. From a UX standpoint, the changes are not noticeable. Screencasts demoing the change: [Old StatVar search](http://screencast/cast/NTk3NjE3MDY0NTQyMjA4MHxhNDZhNjgwMy1iMw) (no observed change to current behavior on [prod](https://datacommons.org/tools/statvar)) [New Vertex AI search](http://screencast/cast/NTMwOTU4MjM5Mjk1MDc4NHwyMDBiNmVkYy03YQ) - The difference here is that source selection now works specifically for Vertex AI stat var searches. These changes to the website will be implemented in a separate, follow up PR. But the screencasts above demo how this new endpoint is intended to be used in the website repo.
1 parent 70a3c33 commit f85f02a

File tree

20 files changed

+1326
-1460
lines changed

20 files changed

+1326
-1460
lines changed

cmd/main.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,6 @@ var (
107107
"",
108108
"Which exporter to use for OpenTelemetry metrics. Valid values are otlp, prometheus, and console (or blank for no-op).",
109109
)
110-
v3MirrorPercent = flag.Int(
111-
"v3_mirror_percent", 0, "Percentage of V2 API requests to mirror to V3. Value from 0 to 100.",
112-
)
113110
)
114111

115112
func main() {
@@ -118,13 +115,6 @@ func main() {
118115
flag.Parse()
119116
log.SetFlags(log.LstdFlags | log.Lshortfile)
120117

121-
if *v3MirrorPercent < 0 || *v3MirrorPercent > 100 {
122-
log.Fatalf("v3_mirror_percent must be between 0 and 100, got %d", *v3MirrorPercent)
123-
}
124-
if *v3MirrorPercent > 0 && !*enableV3 {
125-
log.Fatalf("v3_mirror_percent > 0 requires --enable_v3=true")
126-
}
127-
128118
ctx := context.Background()
129119
var err error
130120

@@ -378,7 +368,6 @@ func main() {
378368

379369
// Create server object
380370
mixerServer := server.NewMixerServer(store, metadata, c, mapsClient, dispatcher)
381-
mixerServer.SetV3MirrorPercent(*v3MirrorPercent)
382371
pbs.RegisterMixerServer(srv, mixerServer)
383372

384373
// Subscribe to branch cache update

deploy/Dockerfile

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
# Build docker image used in Cloud Build for deployment.
1616
# This image contains "gcloud", "kubectl", "yq" and "helm"
1717

18-
FROM docker:stable AS docker-cli
1918
FROM google/cloud-sdk:slim
2019

21-
COPY --from=docker-cli /usr/local/bin/docker /usr/bin/docker
22-
2320
RUN apt-get update && \
2421
apt-get install -y kubectl gettext-base google-cloud-sdk-gke-gcloud-auth-plugin && \
2522
curl -L https://github.com/mikefarah/yq/releases/download/v4.27.3/yq_linux_386 -o /usr/bin/yq && \

deploy/apigee/envs/nonprod.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ proxies:
1010
- rewrite-quota-exceeded-message
1111
- set-southbound-key
1212
- strip-api-key-header-and-params
13+
- block-internal-only-endpoints
1314
proxy_endpoints:
1415
- api
1516
target_endpoints:

deploy/apigee/envs/prod.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ proxies:
1010
- rewrite-quota-exceeded-message
1111
- set-southbound-key
1212
- strip-api-key-header-and-params
13+
- block-internal-only-endpoints
1314
proxy_endpoints:
1415
- api
1516
target_endpoints:
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
2+
<RaiseFault async="false" continueOnError="false" enabled="true" name="block-internal-only-endpoints">
3+
<DisplayName>block-internal-only-endpoints</DisplayName>
4+
<Properties/>
5+
<FaultResponse>
6+
<Set>
7+
<Payload contentType="application/json">{"message":"Access to this endpoint is only allowed internally.", "code": 403}</Payload>
8+
<StatusCode>403</StatusCode>
9+
<ReasonPhrase>Forbidden</ReasonPhrase>
10+
</Set>
11+
</FaultResponse>
12+
</RaiseFault>

deploy/apigee/proxies/api.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<Policy>rewrite-quota-exceeded-message</Policy>
1313
<Policy>set-southbound-key</Policy>
1414
<Policy>strip-api-key-header-and-params</Policy>
15+
<Policy>block-internal-only-endpoints</Policy>
1516
</Policies>
1617
<ProxyEndpoints>
1718
<ProxyEndpoint>api</ProxyEndpoint>

deploy/apigee/target_endpoints/api.template.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@
7575
<Name>enforce-quota-limit</Name>
7676
</Step>
7777
</Request>
78+
<Request>
79+
<Step>
80+
<Condition>
81+
proxy.pathsuffix MatchesPath "/v2/variable/filter" AND request.verb == "POST"
82+
</Condition>
83+
<Name>block-internal-only-endpoints</Name>
84+
</Step>
85+
</Request>
7886
</Flow>
7987
</Flows>
8088
<PostFlow name="PostFlow">

internal/merger/merger.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,25 @@ func MergeSearchStatVarResponse(primary, secondary *pb.SearchStatVarResponse) *p
452452
return merged
453453
}
454454

455+
// MergeFilterStatVarsByEntityResponse merges two FilterStatVarsByEntityResponse.
456+
func MergeFilterStatVarsByEntityResponse(primary, secondary *pb.FilterStatVarsByEntityResponse) *pb.FilterStatVarsByEntityResponse {
457+
// TODO: Consider deduping stat vars
458+
mergedStatVars := []*pb.EntityInfo{}
459+
460+
if primary != nil {
461+
mergedStatVars = append(mergedStatVars, primary.StatVars...)
462+
}
463+
if secondary != nil {
464+
mergedStatVars = append(mergedStatVars, secondary.StatVars...)
465+
}
466+
467+
merged := &pb.FilterStatVarsByEntityResponse{
468+
StatVars: mergedStatVars,
469+
}
470+
471+
return merged
472+
}
473+
455474
// Merges multiple V2 NodeSearchResponses.
456475
// Cycles through responses in order of priority and add results one by one.
457476
func MergeMultiNodeSearch(allResp []*pbv2.NodeSearchResponse) (*pbv2.NodeSearchResponse, error) {

internal/merger/merger_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,6 +1578,119 @@ func TestMergeSearchStatVarResponse(t *testing.T) {
15781578
}
15791579
}
15801580

1581+
func TestMergeFilterStatVarsByEntityResponse(t *testing.T) {
1582+
cmpOpts := cmp.Options{protocmp.Transform()}
1583+
for _, tc := range []struct {
1584+
desc string
1585+
primary *pb.FilterStatVarsByEntityResponse
1586+
secondary *pb.FilterStatVarsByEntityResponse
1587+
want *pb.FilterStatVarsByEntityResponse
1588+
}{{
1589+
desc: "primary only",
1590+
primary: &pb.FilterStatVarsByEntityResponse{
1591+
StatVars: []*pb.EntityInfo{
1592+
{
1593+
Name: "sv1",
1594+
Dcid: "svid1",
1595+
},
1596+
{
1597+
Name: "sv2",
1598+
Dcid: "svid2",
1599+
},
1600+
},
1601+
},
1602+
want: &pb.FilterStatVarsByEntityResponse{
1603+
StatVars: []*pb.EntityInfo{
1604+
{
1605+
Name: "sv1",
1606+
Dcid: "svid1",
1607+
},
1608+
{
1609+
Name: "sv2",
1610+
Dcid: "svid2",
1611+
},
1612+
},
1613+
},
1614+
}, {
1615+
desc: "secondary only",
1616+
secondary: &pb.FilterStatVarsByEntityResponse{
1617+
StatVars: []*pb.EntityInfo{
1618+
{
1619+
Name: "sv1",
1620+
Dcid: "svid1",
1621+
},
1622+
{
1623+
Name: "sv2",
1624+
Dcid: "svid2",
1625+
},
1626+
},
1627+
},
1628+
want: &pb.FilterStatVarsByEntityResponse{
1629+
StatVars: []*pb.EntityInfo{
1630+
{
1631+
Name: "sv1",
1632+
Dcid: "svid1",
1633+
},
1634+
{
1635+
Name: "sv2",
1636+
Dcid: "svid2",
1637+
},
1638+
},
1639+
},
1640+
}, {
1641+
desc: "combined",
1642+
primary: &pb.FilterStatVarsByEntityResponse{
1643+
StatVars: []*pb.EntityInfo{
1644+
{
1645+
Name: "sv1",
1646+
Dcid: "svid1",
1647+
},
1648+
{
1649+
Name: "sv2",
1650+
Dcid: "svid2",
1651+
},
1652+
},
1653+
},
1654+
secondary: &pb.FilterStatVarsByEntityResponse{
1655+
StatVars: []*pb.EntityInfo{
1656+
{
1657+
Name: "sv3",
1658+
Dcid: "svid3",
1659+
},
1660+
{
1661+
Name: "sv4",
1662+
Dcid: "svid4",
1663+
},
1664+
},
1665+
},
1666+
want: &pb.FilterStatVarsByEntityResponse{
1667+
StatVars: []*pb.EntityInfo{
1668+
{
1669+
Name: "sv1",
1670+
Dcid: "svid1",
1671+
},
1672+
{
1673+
Name: "sv2",
1674+
Dcid: "svid2",
1675+
},
1676+
{
1677+
Name: "sv3",
1678+
Dcid: "svid3",
1679+
},
1680+
{
1681+
Name: "sv4",
1682+
Dcid: "svid4",
1683+
},
1684+
},
1685+
},
1686+
}} {
1687+
got := MergeFilterStatVarsByEntityResponse(tc.primary, tc.secondary)
1688+
if diff := cmp.Diff(got, tc.want, cmpOpts); diff != "" {
1689+
t.Errorf("%s: got diff: %s", tc.desc, diff)
1690+
}
1691+
}
1692+
}
1693+
15811694
func TestMergeMultiNodeSearch(t *testing.T) {
15821695
cmpOpts := cmp.Options{protocmp.Transform()}
15831696
for _, c := range []struct {

internal/metrics/metrics.go

Lines changed: 1 addition & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import (
3636
"go.opentelemetry.io/otel/metric"
3737
sdk "go.opentelemetry.io/otel/sdk/metric"
3838
"google.golang.org/grpc"
39-
"google.golang.org/grpc/status"
4039
)
4140

4241
const (
@@ -66,8 +65,7 @@ const (
6665
cacheDataTypeAttr = "cachedata.type"
6766

6867
// Common metric attributes
69-
rpcMethodAttr = "rpc.method"
70-
rpcStatusCodeAttr = "rpc.grpc.status_code"
68+
rpcMethodAttr = "rpc.method"
7169

7270
unknownMethodName = "UnknownMethod"
7371
)
@@ -223,45 +221,3 @@ func RecordCachedataRead(ctx context.Context, cacheType string) {
223221
attribute.String(rpcMethodAttr, getRpcMethod(ctx)),
224222
))
225223
}
226-
227-
// RecordV3LatencyDiff records the latency difference between an API call and a
228-
// mirrored equivalent V3 call.
229-
func RecordV3LatencyDiff(ctx context.Context, diff time.Duration) {
230-
latencyDiffHistogram, _ := otel.GetMeterProvider().Meter(meterName).
231-
Int64Histogram("datacommons.mixer.v3_latency_diff",
232-
metric.WithDescription("Difference in latency between mirrored V3 API calls in milliseconds (v3 minus original)"),
233-
metric.WithUnit("ms"))
234-
235-
latencyDiffHistogram.Record(ctx, diff.Milliseconds(),
236-
metric.WithAttributes(
237-
attribute.String(rpcMethodAttr, getRpcMethod(ctx)),
238-
))
239-
}
240-
241-
// RecordV3Mismatch increments a counter for how many times a mirrored V3 call
242-
// returns a different value from the original call.
243-
func RecordV3Mismatch(ctx context.Context) {
244-
mismatchCounter, _ := otel.GetMeterProvider().Meter(meterName).
245-
Int64Counter("datacommons.mixer.v3_response_mismatches",
246-
metric.WithDescription("Count of V3 mirrored response mismatches"),
247-
)
248-
mismatchCounter.Add(ctx, 1,
249-
metric.WithAttributes(
250-
attribute.String(rpcMethodAttr, getRpcMethod(ctx)),
251-
))
252-
}
253-
254-
// RecordV3MirrorError increments a counter for mirrored V3 requests that
255-
// returned an error.
256-
func RecordV3MirrorError(ctx context.Context, err error) {
257-
st, _ := status.FromError(err)
258-
errorCounter, _ := otel.GetMeterProvider().Meter(meterName).
259-
Int64Counter("datacommons.mixer.v3_mirror_errors",
260-
metric.WithDescription("Count of errors encountered during V3 mirroring"),
261-
)
262-
errorCounter.Add(ctx, 1,
263-
metric.WithAttributes(
264-
attribute.String(rpcMethodAttr, getRpcMethod(ctx)),
265-
attribute.String(rpcStatusCodeAttr, st.Code().String()),
266-
))
267-
}

0 commit comments

Comments
 (0)