Skip to content

Commit 5dc3c82

Browse files
authored
filter observations from spanner for existence requests (#1561)
This is to make the results from spanner smaller for existence requests that don't need them (note: some of the tests got updated with some temperature data since I added it to the test instance, but that's unrelated to this change)
1 parent 3f236ed commit 5dc3c82

File tree

11 files changed

+261
-101
lines changed

11 files changed

+261
-101
lines changed

internal/server/spanner/datasource.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,19 @@ func (sds *SpannerDataSource) Observation(ctx context.Context, req *pbv2.Observa
103103
date := req.Date
104104
var observations []*Observation
105105
var err error
106+
filterObs := isExistenceRequest(req.Select)
106107

107108
if entityExpr != "" {
108109
containedInPlace, err := v2.ParseContainedInPlace(entityExpr)
109110
if err != nil {
110111
return nil, fmt.Errorf("error getting observations (contained in): %v", err)
111112
}
112-
observations, err = sds.client.GetObservationsContainedInPlace(ctx, variables, containedInPlace, date)
113+
observations, err = sds.client.GetObservationsContainedInPlace(ctx, variables, containedInPlace, date, filterObs)
113114
if err != nil {
114115
return nil, fmt.Errorf("error getting observations (contained in): %v", err)
115116
}
116117
} else {
117-
observations, err = sds.client.GetObservations(ctx, variables, entities, date)
118+
observations, err = sds.client.GetObservations(ctx, variables, entities, date, filterObs)
118119
if err != nil {
119120
return nil, fmt.Errorf("error getting observations: %v", err)
120121
}

internal/server/spanner/dsutil.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,28 +216,44 @@ func selectFieldsToQueryOptions(selectFields []string) queryOptions {
216216
return qo
217217
}
218218

219-
// Whether to return all observations in the Observation response.
220-
func queryObs(qo *queryOptions) bool {
219+
// Whether the queryOptions are for a full observation request.
220+
func isObservationRequest(qo *queryOptions) bool {
221221
return qo.date && qo.value
222222
}
223223

224-
func buildBaseObsStatement(variables []string, entities []string, date string) spanner.Statement {
224+
// Whether the queryOptions are for an existence request.
225+
func isExistenceRequest(selectFields []string) bool {
226+
qo := selectFieldsToQueryOptions(selectFields)
227+
return !isObservationRequest(&qo) && !qo.facet
228+
}
229+
230+
func buildBaseObsStatement(variables []string, entities []string, date string, filterObs bool) spanner.Statement {
225231
stmt := spanner.Statement{
226232
Params: map[string]interface{}{},
227233
}
228234
filters := []string{}
229235

236+
var baseStmt string
237+
var obsStmt string
230238
switch date {
231239
case "":
232-
stmt.SQL = statements.getAllObs
240+
baseStmt = statements.getObs
241+
obsStmt = statements.allObs
233242
case shared.LATEST:
234-
stmt.SQL = statements.getLatestObs
243+
baseStmt = statements.getObs
244+
obsStmt = statements.latestObs
235245
default:
236-
stmt.SQL = statements.getDateObs
246+
baseStmt = statements.getDateObs
237247
stmt.Params["date"] = fmt.Sprintf("$.%s", date)
248+
obsStmt = statements.dateObs
238249
filters = append(filters, statements.selectDate)
239250
}
240251

252+
if filterObs {
253+
obsStmt = statements.emptyObs
254+
}
255+
stmt.SQL = fmt.Sprintf(baseStmt, obsStmt)
256+
241257
if len(variables) > 0 {
242258
stmt.Params["variables"] = variables
243259
filters = append(filters, statements.selectVariableDcids)
@@ -272,7 +288,7 @@ func observationsToObservationResponse(req *pbv2.ObservationRequest, observation
272288
// For now, V3 will match the behavior of V2 to preserve backward compatibility and allow datasource merging.
273289
// TODO: Unify these responses more.
274290
qo := selectFieldsToQueryOptions(req.Select)
275-
if queryObs(&qo) {
291+
if isObservationRequest(&qo) {
276292
// Returns FacetObservations with PointStats.
277293
return obsToObsResponse(req, observations)
278294
} else if qo.facet {

internal/server/spanner/golden/query/get_observations_entity.json

Lines changed: 18 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,7 @@
33
"VariableMeasured": "Count_HeatTemperatureEvent",
44
"ObservationAbout": "wikidataId/Q341968",
55
"Observations": {
6-
"Observations": [
7-
{
8-
"Date": "2022-09",
9-
"Value": "1"
10-
},
11-
{
12-
"Date": "2022-10",
13-
"Value": "2"
14-
}
15-
]
6+
"Observations": []
167
},
178
"Provenance": "dc/base/TemperatureEvents_Agg",
189
"ObservationPeriod": "P1M",
@@ -26,12 +17,7 @@
2617
"VariableMeasured": "Count_HeatTemperatureEvent",
2718
"ObservationAbout": "wikidataId/Q341968",
2819
"Observations": {
29-
"Observations": [
30-
{
31-
"Date": "2022",
32-
"Value": "3"
33-
}
34-
]
20+
"Observations": []
3521
},
3622
"Provenance": "dc/base/TemperatureEvents_Agg",
3723
"ObservationPeriod": "P1Y",
@@ -45,12 +31,7 @@
4531
"VariableMeasured": "Count_Person",
4632
"ObservationAbout": "wikidataId/Q341968",
4733
"Observations": {
48-
"Observations": [
49-
{
50-
"Date": "2016",
51-
"Value": "653"
52-
}
53-
]
34+
"Observations": []
5435
},
5536
"Provenance": "dc/base/WikidataPopulation",
5637
"ObservationPeriod": "",
@@ -64,12 +45,7 @@
6445
"VariableMeasured": "Count_Person",
6546
"ObservationAbout": "wikidataId/Q341968",
6647
"Observations": {
67-
"Observations": [
68-
{
69-
"Date": "2021",
70-
"Value": "606"
71-
}
72-
]
48+
"Observations": []
7349
},
7450
"Provenance": "dc/base/WikipediaStatsData",
7551
"ObservationPeriod": "",
@@ -78,5 +54,19 @@
7854
"ScalingFactor": "",
7955
"ImportName": "WikipediaStatsData",
8056
"ProvenanceURL": "https://www.wikipedia.org"
57+
},
58+
{
59+
"VariableMeasured": "Max_Temperature",
60+
"ObservationAbout": "wikidataId/Q341968",
61+
"Observations": {
62+
"Observations": []
63+
},
64+
"Provenance": "dc/base/NOAA_EPA_Observed_Historical_Weather",
65+
"ObservationPeriod": "P1M",
66+
"MeasurementMethod": "dcAggregate/NASAGSOD_NASAGHCN_EPA",
67+
"Unit": "Celsius",
68+
"ScalingFactor": "",
69+
"ImportName": "NOAA_EPA_Observed_Historical_Weather",
70+
"ProvenanceURL": "https://www.noaa.gov/"
8171
}
8272
]

internal/server/spanner/golden/query_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ func TestGetObservations(t *testing.T) {
348348
variables []string
349349
entities []string
350350
date string
351+
filterObs bool
351352
goldenFile string
352353
}{
353354
{
@@ -357,6 +358,7 @@ func TestGetObservations(t *testing.T) {
357358
},
358359
{
359360
entities: []string{"wikidataId/Q341968"},
361+
filterObs: true,
360362
goldenFile: "get_observations_entity.json",
361363
},
362364
{
@@ -372,7 +374,7 @@ func TestGetObservations(t *testing.T) {
372374
goldenFile: "get_observations_date.json",
373375
},
374376
} {
375-
actual, err := client.GetObservations(ctx, c.variables, c.entities, c.date)
377+
actual, err := client.GetObservations(ctx, c.variables, c.entities, c.date, c.filterObs)
376378

377379
if err != nil {
378380
t.Fatalf("GetObservations error (%v): %v", c.goldenFile, err)
@@ -417,6 +419,7 @@ func TestGetObservationsContainedInPlace(t *testing.T) {
417419
variables []string
418420
containedInPlace *v2.ContainedInPlace
419421
date string
422+
filterObs bool
420423
goldenFile string
421424
}{
422425
{
@@ -437,7 +440,7 @@ func TestGetObservationsContainedInPlace(t *testing.T) {
437440
goldenFile: "get_observations_contained_in_date.json",
438441
},
439442
} {
440-
actual, err := client.GetObservationsContainedInPlace(ctx, c.variables, c.containedInPlace, c.date)
443+
actual, err := client.GetObservationsContainedInPlace(ctx, c.variables, c.containedInPlace, c.date, c.filterObs)
441444

442445
if err != nil {
443446
t.Fatalf("GetObservations error (%v): %v", c.goldenFile, err)

internal/server/spanner/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (ts *TimeSeries) DecodeSpanner(val interface{}) (err error) {
7373
if !ok {
7474
return fmt.Errorf("failed to decode TimeSeries: (%v)", val)
7575
}
76+
ts.Observations = []*DateValue{}
7677
for _, v := range listVal.Values {
7778
var data map[string]string
7879
err := json.Unmarshal([]byte(v.GetStringValue()), &data)

internal/server/spanner/query.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,15 +167,15 @@ func (sc *SpannerClient) GetNodeEdgesByID(ctx context.Context, ids []string, arc
167167
}
168168

169169
// GetObservations retrieves observations from Spanner given a list of variables and entities.
170-
func (sc *SpannerClient) GetObservations(ctx context.Context, variables []string, entities []string, date string) ([]*Observation, error) {
170+
func (sc *SpannerClient) GetObservations(ctx context.Context, variables []string, entities []string, date string, filterObs bool) ([]*Observation, error) {
171171
var observations []*Observation
172172
if len(entities) == 0 {
173173
return nil, fmt.Errorf("entity must be specified")
174174
}
175175

176176
err := sc.queryAndCollect(
177177
ctx,
178-
buildBaseObsStatement(variables, entities, date),
178+
buildBaseObsStatement(variables, entities, date, filterObs),
179179
func() interface{} {
180180
return &Observation{}
181181
},
@@ -192,13 +192,13 @@ func (sc *SpannerClient) GetObservations(ctx context.Context, variables []string
192192
}
193193

194194
// GetObservationsContainedInPlace retrieves observations from Spanner given a list of variables and an entity expression.
195-
func (sc *SpannerClient) GetObservationsContainedInPlace(ctx context.Context, variables []string, containedInPlace *v2.ContainedInPlace, date string) ([]*Observation, error) {
195+
func (sc *SpannerClient) GetObservationsContainedInPlace(ctx context.Context, variables []string, containedInPlace *v2.ContainedInPlace, date string, filterObs bool) ([]*Observation, error) {
196196
var observations []*Observation
197197
if len(variables) == 0 || containedInPlace == nil {
198198
return observations, nil
199199
}
200200

201-
stmt := buildBaseObsStatement(variables, []string{} /*entities*/, date)
201+
stmt := buildBaseObsStatement(variables, []string{} /*entities*/, date, filterObs)
202202
stmt.SQL = fmt.Sprintf(statements.getObsByVariableAndContainedInPlace, stmt.SQL)
203203
stmt.Params["ancestor"] = containedInPlace.Ancestor
204204
stmt.Params["childPlaceType"] = containedInPlace.ChildPlaceType

internal/server/spanner/statements.go

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,18 @@ var statements = struct {
4343
applyOffset string
4444
// Subquery to apply page limit.
4545
applyLimit string
46-
// Fetch all Observations.
47-
getAllObs string
48-
// Fetch latest Observations.
49-
getLatestObs string
46+
// Fetch Observations.
47+
getObs string
5048
// Fetch Observations for a specific date.
5149
getDateObs string
50+
// Subquery to return all Observations.
51+
allObs string
52+
// Subquery to return the latest Observations.
53+
latestObs string
54+
// Subquery to return Observations for a specific date.
55+
dateObs string
56+
// Subquery to return empty Observations.
57+
emptyObs string
5258
// Filter by variable dcids.
5359
selectVariableDcids string
5460
// Filter by entity dcids.
@@ -245,26 +251,11 @@ var statements = struct {
245251
applyLimit: fmt.Sprintf(`
246252
LIMIT %d
247253
`, PAGE_SIZE+1),
248-
getAllObs: `
254+
getObs: `
249255
SELECT
250256
variable_measured,
251257
observation_about,
252-
observations,
253-
provenance,
254-
COALESCE(observation_period, '') AS observation_period,
255-
COALESCE(measurement_method, '') AS measurement_method,
256-
COALESCE(unit, '') AS unit,
257-
COALESCE(scaling_factor, '') AS scaling_factor,
258-
import_name,
259-
provenance_url
260-
FROM
261-
Observation
262-
`,
263-
getLatestObs: `
264-
SELECT
265-
variable_measured,
266-
observation_about,
267-
observations[ARRAY_LENGTH(observations)-1] AS observations,
258+
%s,
268259
provenance,
269260
COALESCE(observation_period, '') AS observation_period,
270261
COALESCE(measurement_method, '') AS measurement_method,
@@ -279,7 +270,7 @@ var statements = struct {
279270
SELECT
280271
variable_measured,
281272
observation_about,
282-
obs AS observations,
273+
%s,
283274
provenance,
284275
COALESCE(observation_period, '') AS observation_period,
285276
COALESCE(measurement_method, '') AS measurement_method,
@@ -291,6 +282,18 @@ var statements = struct {
291282
Observation,
292283
UNNEST(observations) as obs
293284
`,
285+
allObs: `
286+
observations
287+
`,
288+
latestObs: `
289+
[observations[ARRAY_LENGTH(observations)-1]] AS observations
290+
`,
291+
dateObs: `
292+
[obs] AS observations
293+
`,
294+
emptyObs: `
295+
ARRAY<JSON>[] AS observations
296+
`,
294297
selectVariableDcids: `
295298
variable_measured IN UNNEST(@variables)
296299
`,

internal/server/v3/observation/golden/observation_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func TestV3Observation(t *testing.T) {
222222
{
223223
req: &pbv2.ObservationRequest{
224224
Entity: &pbv2.DcidOrExpression{
225-
Dcids: []string{"wikidataId/Q341968", "wikidataId/Q1764983"},
225+
Dcids: []string{"wikidataId/Q4671576", "wikidataId/Q1764983"},
226226
},
227227
Select: []string{"entity", "variable", "date", "value"},
228228
},

0 commit comments

Comments
 (0)