Skip to content

Commit 59d336b

Browse files
authored
Switch V3 to use dc_graph_stable (#1568)
This switches Mixer to use dc_graph_stable, which has all the import groups (except dcbranch) This also updates the queries and goldens for the Node API (including adding object_bytes) and Observation API Note: * The search index is still pending cherrypick release, so the NodeSearch API will not work after this change * This doesn't yet include the isDcAggregate field, so some of the facet ids changed. As a follow up I can create a new build and then update with this field * This build includes the nodes that are missing name/types, so some of them are dropped in the goldens. After they're properly merged/resolved I can create a new build * Since querying within maps is till unclear, for now I just removed date filtering and am returning the full time series
1 parent 3136740 commit 59d336b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+75883
-31773
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
project: datcom-store
22
instance: dc-kg-test
3-
database: dc_graph_4
3+
database: dc_graph_stable

internal/proto/storage.pb.go

Lines changed: 130 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/server/spanner/datasource.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,25 +103,24 @@ func (sds *SpannerDataSource) Observation(ctx context.Context, req *pbv2.Observa
103103
date := req.Date
104104
var observations []*Observation
105105
var err error
106-
filterObs := isExistenceRequest(req.Select)
107106

108107
if entityExpr != "" {
109108
containedInPlace, err := v2.ParseContainedInPlace(entityExpr)
110109
if err != nil {
111110
return nil, fmt.Errorf("error getting observations (contained in): %v", err)
112111
}
113-
observations, err = sds.client.GetObservationsContainedInPlace(ctx, variables, containedInPlace, date, filterObs)
112+
observations, err = sds.client.GetObservationsContainedInPlace(ctx, variables, containedInPlace)
114113
if err != nil {
115114
return nil, fmt.Errorf("error getting observations (contained in): %v", err)
116115
}
117116
} else {
118-
observations, err = sds.client.GetObservations(ctx, variables, entities, date, filterObs)
117+
observations, err = sds.client.GetObservations(ctx, variables, entities)
119118
if err != nil {
120119
return nil, fmt.Errorf("error getting observations: %v", err)
121120
}
122121
}
123122

124-
observations = filterObservationsByFacet(observations, req.Filter)
123+
observations = filterObservationsByDateAndFacet(observations, date, req.Filter)
125124

126125
return observationsToObservationResponse(req, observations), nil
127126
}

internal/server/spanner/dsutil.go

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -165,15 +165,19 @@ func nodeEdgesToNodeResponse(nodes []string, edgesBySubjectID map[string][]*Edge
165165
nodeResponse.NextToken = nextToken
166166
}
167167

168-
nodeResponse.Data[subjectID] = nodeEdgesToLinkedGraph(edges)
168+
linkedGraph, err := nodeEdgesToLinkedGraph(edges)
169+
if err != nil {
170+
return nil, err
171+
}
172+
nodeResponse.Data[subjectID] = linkedGraph
169173
}
170174

171175
return nodeResponse, nil
172176
}
173177

174178
// nodeEdgesToLinkedGraph converts an array of edges to a LinkedGraph proto.
175179
// This method assumes all edges are from the same entity.
176-
func nodeEdgesToLinkedGraph(edges []*Edge) *pbv2.LinkedGraph {
180+
func nodeEdgesToLinkedGraph(edges []*Edge) (*pbv2.LinkedGraph, error) {
177181
linkedGraph := &pbv2.LinkedGraph{
178182
Arcs: make(map[string]*pbv2.Nodes),
179183
}
@@ -190,12 +194,22 @@ func nodeEdgesToLinkedGraph(edges []*Edge) *pbv2.LinkedGraph {
190194
ProvenanceId: edge.Provenance,
191195
Value: edge.ObjectValue,
192196
}
197+
198+
// Use object_bytes if set.
199+
if edge.ObjectBytes != nil {
200+
bytes, err := util.Unzip(edge.ObjectBytes)
201+
if err != nil {
202+
return nil, err
203+
}
204+
node.Value = string(bytes)
205+
}
206+
193207
nodes.Nodes = append(nodes.Nodes, node)
194208

195209
linkedGraph.Arcs[edge.Predicate] = nodes
196210
}
197211

198-
return linkedGraph
212+
return linkedGraph, nil
199213
}
200214

201215
func selectFieldsToQueryOptions(selectFields []string) queryOptions {
@@ -221,57 +235,50 @@ func isObservationRequest(qo *queryOptions) bool {
221235
return qo.date && qo.value
222236
}
223237

224-
// Whether the queryOptions are for an existence request.
225-
func isExistenceRequest(selectFields []string) bool {
226-
qo := selectFieldsToQueryOptions(selectFields)
227-
return !isObservationRequest(&qo) && !qo.facet
228-
}
229-
230-
func buildBaseObsStatement(variables []string, entities []string, date string, filterObs bool) spanner.Statement {
238+
func buildBaseObsStatement(variables []string, entities []string) spanner.Statement {
231239
stmt := spanner.Statement{
240+
SQL: statements.getObs,
232241
Params: map[string]interface{}{},
233242
}
234-
filters := []string{}
235-
236-
var baseStmt string
237-
var obsStmt string
238-
switch date {
239-
case "":
240-
baseStmt = statements.getObs
241-
obsStmt = statements.allObs
242-
case shared.LATEST:
243-
baseStmt = statements.getObs
244-
obsStmt = statements.latestObs
245-
default:
246-
baseStmt = statements.getDateObs
247-
stmt.Params["date"] = fmt.Sprintf("$.%s", date)
248-
obsStmt = statements.dateObs
249-
filters = append(filters, statements.selectDate)
250-
}
251-
252-
if filterObs {
253-
obsStmt = statements.emptyObs
254-
}
255-
stmt.SQL = fmt.Sprintf(baseStmt, obsStmt)
256243

244+
filters := []string{}
257245
if len(variables) > 0 {
258246
stmt.Params["variables"] = variables
259247
filters = append(filters, statements.selectVariableDcids)
260248
}
261-
262249
if len(entities) > 0 {
263250
stmt.Params["entities"] = entities
264251
filters = append(filters, statements.selectEntityDcids)
265252
}
266-
267253
stmt.SQL += WHERE + strings.Join(filters, AND)
268254

269255
return stmt
270256
}
271257

272-
func filterObservationsByFacet(observations []*Observation, filter *pbv2.FacetFilter) []*Observation {
258+
func filterTimeSeriesByDate(ts *TimeSeries, date string) {
259+
switch date {
260+
case "":
261+
case shared.LATEST:
262+
if ts == nil || *ts == nil || len(*ts) == 0 {
263+
*ts = TimeSeries{}
264+
} else {
265+
*ts = TimeSeries{(*ts)[len(*ts)-1]}
266+
}
267+
default:
268+
for _, dv := range *ts {
269+
if dv.Date == date {
270+
*ts = TimeSeries{dv}
271+
return
272+
}
273+
}
274+
*ts = TimeSeries{}
275+
}
276+
}
277+
278+
func filterObservationsByDateAndFacet(observations []*Observation, date string, filter *pbv2.FacetFilter) []*Observation {
273279
var filtered []*Observation
274280
for _, observation := range observations {
281+
filterTimeSeriesByDate(&observation.Observations, date)
275282
facet := observationToFacet(observation)
276283
if util.ShouldIncludeFacet(filter, facet) {
277284
filtered = append(filtered, observation)
@@ -496,7 +503,6 @@ func observationToFacetObservation(observation *Observation, includeObs bool) (*
496503
facet := observationToFacet(observation)
497504

498505
var observations []*pb.PointStat
499-
500506
for _, dateValue := range observation.Observations {
501507
pointStat, err := dateValueToPointStat(dateValue)
502508

internal/server/spanner/golden/query/get_node_edges_by_object_id.json

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
11
{
22
"FireIncidentTypeEnum": [
3+
{
4+
"SubjectID": "FireIncidentTypeEnum",
5+
"Predicate": "rangeIncludes",
6+
"ObjectID": "classification",
7+
"ObjectValue": "",
8+
"ObjectBytes": null,
9+
"Provenance": "dc/base/BaseSchema",
10+
"Name": "classification",
11+
"Types": [
12+
"Property"
13+
]
14+
},
315
{
416
"SubjectID": "FireIncidentTypeEnum",
517
"Predicate": "typeOf",
618
"ObjectID": "ComplexFire",
719
"ObjectValue": "",
20+
"ObjectBytes": null,
821
"Provenance": "dc/base/BaseSchema",
922
"Name": "ComplexFire",
1023
"Types": [
@@ -16,6 +29,7 @@
1629
"Predicate": "typeOf",
1730
"ObjectID": "PrescribedFire",
1831
"ObjectValue": "",
32+
"ObjectBytes": null,
1933
"Provenance": "dc/base/BaseSchema",
2034
"Name": "PrescribedFire",
2135
"Types": [
@@ -30,6 +44,7 @@
3044
"Predicate": "typeOf",
3145
"ObjectID": "Wildfire",
3246
"ObjectValue": "",
47+
"ObjectBytes": null,
3348
"Provenance": "dc/base/BaseSchema",
3449
"Name": "Wildfire",
3550
"Types": [
@@ -42,6 +57,7 @@
4257
"Predicate": "typeOf",
4358
"ObjectID": "WildlandFireUse",
4459
"ObjectValue": "",
60+
"ObjectBytes": null,
4561
"Provenance": "dc/base/BaseSchema",
4662
"Name": "WildlandFireUse",
4763
"Types": [
@@ -50,11 +66,24 @@
5066
}
5167
],
5268
"FoodTypeEnum": [
69+
{
70+
"SubjectID": "FoodTypeEnum",
71+
"Predicate": "rangeIncludes",
72+
"ObjectID": "foodType",
73+
"ObjectValue": "",
74+
"ObjectBytes": null,
75+
"Provenance": "dc/base/BaseSchema",
76+
"Name": "foodType",
77+
"Types": [
78+
"Property"
79+
]
80+
},
5381
{
5482
"SubjectID": "FoodTypeEnum",
5583
"Predicate": "typeOf",
5684
"ObjectID": "Meal",
5785
"ObjectValue": "",
86+
"ObjectBytes": null,
5887
"Provenance": "dc/base/BaseSchema",
5988
"Name": "Meal",
6089
"Types": [

0 commit comments

Comments
 (0)