Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a5d1aca

Browse files
authoredFeb 23, 2021
shadow: support metadata (#213)
* Support metadata * Update example * Keep previous Metadata.Delta on Get response processing
1 parent 7c01526 commit a5d1aca

File tree

7 files changed

+319
-30
lines changed

7 files changed

+319
-30
lines changed
 

‎examples/shadow/main.go‎

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func main() {
9797
fmt.Printf("async error: %v\n", err)
9898
})
9999
s.OnDelta(func(delta map[string]interface{}) {
100-
fmt.Printf("delta: %+v\n", delta)
100+
fmt.Printf("delta:%s", prettyDump(delta))
101101
})
102102
cli.Handle(s)
103103

@@ -106,28 +106,52 @@ func main() {
106106
if err != nil {
107107
panic(err)
108108
}
109-
fmt.Printf("document: %+v\n", doc)
109+
fmt.Printf("document:%s", prettyDump(doc))
110110

111-
fmt.Print("> update report\n")
111+
time.Sleep(time.Second)
112+
113+
fmt.Print("\n> update report\n")
112114
doc, err = s.Report(ctx, sampleState{Value: 2, Struct: sampleStruct{Values: []int{1, 2}}})
113115
if err != nil {
114116
panic(err)
115117
}
116-
fmt.Printf("document: %+v\n", doc)
118+
fmt.Printf("document:%s", prettyDump(doc))
119+
120+
time.Sleep(time.Second)
117121

118-
fmt.Print("> get document\n")
122+
fmt.Print("\n> get document\n")
119123
doc, err = s.Get(ctx)
120124
if err != nil {
121125
panic(err)
122126
}
123-
fmt.Printf("document: %+v\n", doc)
127+
fmt.Printf("document:%s", prettyDump(doc))
128+
129+
time.Sleep(time.Second)
130+
131+
fmt.Print("\n> remove one attribute from report\n")
132+
doc, err = s.Report(ctx, map[string]interface{}{"Value": nil})
133+
if err != nil {
134+
panic(err)
135+
}
136+
fmt.Printf("document:%s", prettyDump(doc))
137+
138+
time.Sleep(time.Second)
139+
140+
fmt.Print("\n> add an another attribute to report\n")
141+
doc, err = s.Report(ctx, map[string]interface{}{"Value2": 1})
142+
if err != nil {
143+
panic(err)
144+
}
145+
fmt.Printf("document:%s", prettyDump(doc))
146+
147+
time.Sleep(time.Second)
124148

125-
fmt.Print("> delete\n")
149+
fmt.Print("\n> delete\n")
126150
err = s.Delete(ctx)
127151
if err != nil {
128152
panic(err)
129153
}
130-
fmt.Printf("document: %+v\n", s.Document())
154+
fmt.Printf("document:%s\n\n", prettyDump(s.Document()))
131155
}
132156

133157
type sampleStruct struct {

‎examples/shadow/shortprettydump.go‎

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright 2021 SEQSENSE, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"fmt"
19+
"reflect"
20+
"strings"
21+
)
22+
23+
func prettyDump(d interface{}) string {
24+
return prettyDumpImpl(reflect.ValueOf(d), 0)
25+
}
26+
27+
func prettyDumpImpl(v reflect.Value, depth int) string {
28+
if depth > 1 {
29+
return fmt.Sprintf("\t%+v", v.Interface())
30+
}
31+
depth++
32+
if v.Kind() == reflect.Ptr {
33+
v = v.Elem()
34+
}
35+
indent := strings.Repeat(" ", depth)
36+
var out string
37+
switch v.Kind() {
38+
case reflect.Slice:
39+
out = "\n"
40+
for i := 0; i < v.Len(); i++ {
41+
out += indent + "- " + prettyDumpImpl(v.Index(i), depth) + "\n"
42+
}
43+
case reflect.Map:
44+
out = "\n"
45+
for _, k := range v.MapKeys() {
46+
out += indent + k.String() + ": " + prettyDumpImpl(v.MapIndex(k), depth) + "\n"
47+
}
48+
return out
49+
case reflect.Struct:
50+
out = "\n"
51+
for i := 0; i < v.NumField(); i++ {
52+
out += indent + v.Type().Field(i).Name + ": " + prettyDumpImpl(v.Field(i), depth) + "\n"
53+
}
54+
default:
55+
if !v.IsValid() {
56+
return ""
57+
}
58+
return fmt.Sprintf("\t%+v", v.Interface())
59+
}
60+
return strings.Replace(out, "\n\n", "\n", -1)
61+
}

‎shadow/metadata.go‎

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright 2021 SEQSENSE, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package shadow
16+
17+
import (
18+
"encoding/json"
19+
"regexp"
20+
)
21+
22+
var regexLeafJSON = regexp.MustCompile(`^{[^{}]*}$`)
23+
24+
// ThingStateMetadata represents Thing Shadow State metadata.
25+
type ThingStateMetadata struct {
26+
Desired NestedMetadata `json:"desired,omitempty"`
27+
Reported NestedMetadata `json:"reported,omitempty"`
28+
Delta NestedMetadata `json:"delta,omitempty"`
29+
}
30+
31+
// Metadata represents metadata of state attribute.
32+
type Metadata struct {
33+
Timestamp int `json:"timestamp"`
34+
}
35+
36+
// NestedMetadata is JSON unmarshaller for state metadata.
37+
type NestedMetadata map[string]interface{}
38+
39+
func (n *NestedMetadata) UnmarshalJSON(b []byte) error {
40+
if *n == nil {
41+
*n = make(map[string]interface{})
42+
}
43+
j := make(map[string]json.RawMessage)
44+
if err := json.Unmarshal(b, &j); err != nil {
45+
return err
46+
}
47+
for k, v := range j {
48+
v2, err := unmarshalMetadataImpl(v)
49+
if err != nil {
50+
return err
51+
}
52+
(*n)[k] = v2
53+
}
54+
return nil
55+
}
56+
57+
func unmarshalMetadataImpl(b []byte) (interface{}, error) {
58+
switch {
59+
case b[0] == '[' && b[len(b)-1] == ']':
60+
var v2 []json.RawMessage
61+
if err := json.Unmarshal(b, &v2); err != nil {
62+
return nil, err
63+
}
64+
var v5 []interface{}
65+
for _, v3 := range v2 {
66+
v4, err := unmarshalMetadataImpl(v3)
67+
if err != nil {
68+
return nil, err
69+
}
70+
v5 = append(v5, v4)
71+
}
72+
return v5, nil
73+
case !regexLeafJSON.Match(b):
74+
var v2 NestedMetadata
75+
if err := json.Unmarshal(b, &v2); err != nil {
76+
return nil, err
77+
}
78+
return v2, nil
79+
default:
80+
var v2 Metadata
81+
if err := json.Unmarshal(b, &v2); err != nil {
82+
return nil, err
83+
}
84+
return v2, nil
85+
}
86+
}

‎shadow/metadata_test.go‎

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2021 SEQSENSE, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package shadow
16+
17+
import (
18+
"encoding/json"
19+
"reflect"
20+
"testing"
21+
)
22+
23+
func TestNestedMetadata(t *testing.T) {
24+
const input = `{"attr1":{"timestamp":1234},"attr2":{"attr3":{"timestamp":1235}},"attr4":[{"timestamp":1236},{"timestamp":1237}]}`
25+
var expected = NestedMetadata{
26+
"attr1": Metadata{Timestamp: 1234},
27+
"attr2": NestedMetadata{
28+
"attr3": Metadata{Timestamp: 1235},
29+
},
30+
"attr4": []interface{}{
31+
Metadata{Timestamp: 1236},
32+
Metadata{Timestamp: 1237},
33+
},
34+
}
35+
36+
var v NestedMetadata
37+
if err := json.Unmarshal([]byte(input), &v); err != nil {
38+
t.Fatal(err)
39+
}
40+
if !reflect.DeepEqual(expected, v) {
41+
t.Errorf("Expected:\n(%T) %+v\ngot:\n(%T) %+v", expected, expected, v, v)
42+
}
43+
}

‎shadow/shadow.go‎

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ func (s *shadow) getAccepted(msg *mqtt.Message) {
105105
return
106106
}
107107
s.mu.Lock()
108+
// For some reason, AWS IoT omits Metadata.Delta from Get response. Keep previous Metadata.Delta.
109+
doc.Metadata.Delta = s.doc.Metadata.Delta
108110
s.doc = doc
109111
s.mu.Unlock()
110112
s.handleResponse(doc)
@@ -182,6 +184,11 @@ func New(ctx context.Context, cli awsiotdev.Device, opt ...Option) (Shadow, erro
182184
Reported: map[string]interface{}{},
183185
Delta: map[string]interface{}{},
184186
},
187+
Metadata: ThingStateMetadata{
188+
Desired: map[string]interface{}{},
189+
Reported: map[string]interface{}{},
190+
Delta: map[string]interface{}{},
191+
},
185192
},
186193

187194
chResps: make(map[string]chan interface{}),

‎shadow/shadow_test.go‎

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,15 @@ func TestHandlers(t *testing.T) {
181181
t.Error(err)
182182
case delta := <-chDelta:
183183
if !reflect.DeepEqual(expectedDoc.State.Delta, delta) {
184-
t.Errorf("Expected delta: %v, got: %v",
184+
t.Errorf("Expected delta:\n%+v\ngot:\n%+v",
185185
expectedDoc.State.Delta, delta,
186186
)
187187
}
188188
default:
189189
t.Fatal("Timeout")
190190
}
191191
if !reflect.DeepEqual(expectedDoc, s.doc) {
192-
t.Errorf("Expected state: %v, got: %v",
192+
t.Errorf("Expected state:\n%v\ngot:\n%v",
193193
expectedDoc, s.doc,
194194
)
195195
}
@@ -266,15 +266,15 @@ func TestHandlers(t *testing.T) {
266266
t.Error(err)
267267
case delta := <-chDelta:
268268
if !reflect.DeepEqual(expectedDoc.State.Delta, delta) {
269-
t.Errorf("Expected delta: %v, got: %v",
269+
t.Errorf("Expected delta:\n%+v\ngot:\n%+v",
270270
expectedDoc.State.Delta, delta,
271271
)
272272
}
273273
default:
274274
t.Fatal("Timeout")
275275
}
276276
if !reflect.DeepEqual(expectedDoc, s.doc) {
277-
t.Errorf("Expected state: %v, got: %v",
277+
t.Errorf("Expected state:\n%+v\ngot\n%+v",
278278
expectedDoc, s.doc,
279279
)
280280
}
@@ -327,7 +327,7 @@ func TestHandlers(t *testing.T) {
327327
default:
328328
}
329329
if !reflect.DeepEqual(expectedDoc, s.doc) {
330-
t.Errorf("Expected state: %v, got: %v",
330+
t.Errorf("Expected state:\n%+v\ngot:\n%+v",
331331
expectedDoc, s.doc,
332332
)
333333
}
@@ -366,7 +366,7 @@ func TestHandlers(t *testing.T) {
366366
)})
367367

368368
if !reflect.DeepEqual(expectedDoc, s.doc) {
369-
t.Errorf("Expected state: %v, got: %v",
369+
t.Errorf("Expected state:\n%+v\ngot:\n%+v",
370370
expectedDoc, s.doc,
371371
)
372372
}
@@ -436,7 +436,7 @@ func TestOnDelta(t *testing.T) {
436436
done := make(chan struct{})
437437
s.OnDelta(func(delta map[string]interface{}) {
438438
if !reflect.DeepEqual(expected, delta) {
439-
t.Fatalf("Expected delta: %v, got: %v", expected, delta)
439+
t.Fatalf("Expected delta:\n%+v\ngot:\n%+v", expected, delta)
440440
}
441441
close(done)
442442
})
@@ -465,25 +465,46 @@ func TestOnDelta(t *testing.T) {
465465

466466
func TestGet(t *testing.T) {
467467
testCases := map[string]struct {
468+
initial *ThingDocument
468469
response interface{}
469470
responseTopic string
470471
expected interface{}
471472
err error
472473
}{
473474
"Success": {
475+
initial: &ThingDocument{
476+
Version: 5,
477+
State: ThingState{
478+
Delta: map[string]interface{}{"key": "value2"},
479+
},
480+
Metadata: ThingStateMetadata{
481+
Delta: map[string]interface{}{"key": Metadata{Timestamp: 1234}},
482+
},
483+
},
474484
response: &ThingDocument{
475485
Version: 5,
476486
State: ThingState{
477-
Desired: map[string]interface{}{"key": "value"},
487+
Desired: map[string]interface{}{"key": "value2"},
478488
Reported: map[string]interface{}{"key": "value"},
489+
Delta: map[string]interface{}{"key": "value2"},
490+
},
491+
Metadata: ThingStateMetadata{
492+
Desired: map[string]interface{}{"key": Metadata{Timestamp: 1234}},
493+
Reported: map[string]interface{}{"key": Metadata{Timestamp: 1235}},
479494
},
480495
},
481496
responseTopic: "get/accepted",
482497
expected: &ThingDocument{
483498
Version: 5,
484499
State: ThingState{
485-
Desired: map[string]interface{}{"key": "value"},
500+
Desired: map[string]interface{}{"key": "value2"},
486501
Reported: map[string]interface{}{"key": "value"},
502+
Delta: map[string]interface{}{"key": "value2"},
503+
},
504+
Metadata: ThingStateMetadata{
505+
Desired: NestedMetadata{"key": Metadata{Timestamp: 1234}},
506+
Reported: NestedMetadata{"key": Metadata{Timestamp: 1235}},
507+
Delta: NestedMetadata{"key": Metadata{Timestamp: 1234}},
487508
},
488509
},
489510
},
@@ -537,17 +558,20 @@ func TestGet(t *testing.T) {
537558
if err != nil {
538559
t.Fatal(err)
539560
}
561+
if testCase.initial != nil {
562+
s.(*shadow).doc = testCase.initial
563+
}
540564
cli.Handle(s)
541565

542566
doc, err := s.Get(ctx)
543567
if err != nil {
544568
setClientToken(err, "")
545569
if !reflect.DeepEqual(testCase.err, err) {
546-
t.Fatalf("Expected error: %v, got: %v", testCase.err, err)
570+
t.Fatalf("Expected error:\n%+v\ngot:\n%+v", testCase.err, err)
547571
}
548572
} else {
549573
if !reflect.DeepEqual(testCase.expected, doc) {
550-
t.Errorf("Expected document: %v, got: %v", testCase.expected, doc)
574+
t.Errorf("Expected document:\n%+v\ngot:\n%+v", testCase.expected, doc)
551575
}
552576
}
553577
})
@@ -569,6 +593,9 @@ func TestDesire(t *testing.T) {
569593
State: ThingState{
570594
Desired: map[string]interface{}{"key": "value"},
571595
},
596+
Metadata: ThingStateMetadata{
597+
Desired: map[string]interface{}{"key": Metadata{Timestamp: 1234}},
598+
},
572599
},
573600
responseTopic: "update/accepted",
574601
expected: &ThingDocument{
@@ -578,6 +605,11 @@ func TestDesire(t *testing.T) {
578605
Reported: map[string]interface{}{},
579606
Delta: map[string]interface{}{},
580607
},
608+
Metadata: ThingStateMetadata{
609+
Desired: NestedMetadata{"key": Metadata{Timestamp: 1234}},
610+
Reported: NestedMetadata{},
611+
Delta: NestedMetadata{},
612+
},
581613
},
582614
},
583615
"Error": {
@@ -637,11 +669,11 @@ func TestDesire(t *testing.T) {
637669
if err != nil {
638670
setClientToken(err, "")
639671
if !reflect.DeepEqual(testCase.err, err) {
640-
t.Fatalf("Expected error: %v, got: %v", testCase.err, err)
672+
t.Fatalf("Expected error:\n%+v\ngot:\n%+v", testCase.err, err)
641673
}
642674
} else {
643675
if !reflect.DeepEqual(testCase.expected, doc) {
644-
t.Errorf("Expected document: %v, got: %v", testCase.expected, doc)
676+
t.Errorf("Expected document:\n%+v\ngot:\n%+v", testCase.expected, doc)
645677
}
646678
}
647679
})
@@ -663,6 +695,9 @@ func TestReport(t *testing.T) {
663695
State: ThingState{
664696
Reported: map[string]interface{}{"key": "value"},
665697
},
698+
Metadata: ThingStateMetadata{
699+
Reported: map[string]interface{}{"key": Metadata{Timestamp: 1234}},
700+
},
666701
},
667702
responseTopic: "update/accepted",
668703
expected: &ThingDocument{
@@ -672,6 +707,11 @@ func TestReport(t *testing.T) {
672707
Desired: map[string]interface{}{},
673708
Delta: map[string]interface{}{},
674709
},
710+
Metadata: ThingStateMetadata{
711+
Reported: NestedMetadata{"key": Metadata{Timestamp: 1234}},
712+
Desired: NestedMetadata{},
713+
Delta: NestedMetadata{},
714+
},
675715
},
676716
},
677717
"Error": {
@@ -731,11 +771,11 @@ func TestReport(t *testing.T) {
731771
if err != nil {
732772
setClientToken(err, "")
733773
if !reflect.DeepEqual(testCase.err, err) {
734-
t.Fatalf("Expected error: %v, got: %v", testCase.err, err)
774+
t.Fatalf("Expected error:\n%+v\ngot:\n%+v", testCase.err, err)
735775
}
736776
} else {
737777
if !reflect.DeepEqual(testCase.expected, doc) {
738-
t.Errorf("Expected document: %v, got: %v", testCase.expected, doc)
778+
t.Errorf("Expected document:\n%+v\ngot:\n%+v", testCase.expected, doc)
739779
}
740780
}
741781
})
@@ -808,7 +848,7 @@ func TestDelete(t *testing.T) {
808848
if err != nil {
809849
setClientToken(err, "")
810850
if !reflect.DeepEqual(testCase.err, err) {
811-
t.Fatalf("Expected error: %v, got: %v", testCase.err, err)
851+
t.Fatalf("Expected error:\n%v\ngot:\n%v", testCase.err, err)
812852
}
813853
}
814854
})

‎shadow/state.go‎

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ type ThingState struct {
5151

5252
// ThingDocument represents Thing Shadow Document.
5353
type ThingDocument struct {
54-
State ThingState `json:"state"`
55-
Version int `json:"version,omitempty"`
56-
Timestamp int `json:"timestamp,omitempty"`
57-
ClientToken string `json:"clientToken,omitempty"`
54+
State ThingState `json:"state"`
55+
Metadata ThingStateMetadata `json:"metadata"`
56+
Version int `json:"version,omitempty"`
57+
Timestamp int `json:"timestamp,omitempty"`
58+
ClientToken string `json:"clientToken,omitempty"`
5859
}
5960

6061
type thingStateRaw struct {
@@ -66,13 +67,15 @@ type thingStateRaw struct {
6667

6768
type thingDocumentRaw struct {
6869
State thingStateRaw `json:"state"`
70+
Metadata thingStateRaw `json:"metadata"`
6971
Version int `json:"version,omitempty"`
7072
Timestamp int `json:"timestamp,omitempty"`
7173
ClientToken string `json:"clientToken,omitempty"`
7274
}
7375

7476
type thingDelta struct {
7577
State map[string]interface{} `json:"state"`
78+
Metadata NestedMetadata `json:"metadata"`
7679
Version int `json:"version,omitempty"`
7780
Timestamp int `json:"timestamp,omitempty"`
7881
}
@@ -87,9 +90,15 @@ func (s *ThingDocument) update(state *thingDocumentRaw) error {
8790
if err := updateStateRaw(s.State.Desired, state.State.Desired); err != nil {
8891
return ioterr.New(err, "updating desired state")
8992
}
93+
if err := updateStateMetadataRaw(s.Metadata.Desired, state.Metadata.Desired); err != nil {
94+
return ioterr.New(err, "updating desired state")
95+
}
9096
if err := updateStateRaw(s.State.Reported, state.State.Reported); err != nil {
9197
return ioterr.New(err, "updating reported state")
9298
}
99+
if err := updateStateMetadataRaw(s.Metadata.Reported, state.Metadata.Reported); err != nil {
100+
return ioterr.New(err, "updating reported state")
101+
}
93102
return nil
94103
}
95104

@@ -101,17 +110,25 @@ func (s *ThingDocument) updateDelta(state *thingDelta) bool {
101110
s.Version = state.Version
102111
s.Timestamp = state.Timestamp
103112
s.State.Delta = state.State
113+
s.Metadata.Delta = state.Metadata
104114
return true
105115
}
106116

107-
func updateStateRaw(state map[string]interface{}, update json.RawMessage) error {
117+
func updateStateRawCommon(state map[string]interface{}, update json.RawMessage) bool {
108118
if !hasUpdate(update) {
109-
return nil
119+
return true
110120
}
111121
if update == nil {
112122
for k := range state {
113123
delete(state, k)
114124
}
125+
return true
126+
}
127+
return false
128+
}
129+
130+
func updateStateRaw(state map[string]interface{}, update json.RawMessage) error {
131+
if updateStateRawCommon(state, update) {
115132
return nil
116133
}
117134
var u map[string]interface{}
@@ -121,6 +138,17 @@ func updateStateRaw(state map[string]interface{}, update json.RawMessage) error
121138
return updateState(state, u)
122139
}
123140

141+
func updateStateMetadataRaw(state map[string]interface{}, update json.RawMessage) error {
142+
if updateStateRawCommon(state, update) {
143+
return nil
144+
}
145+
var u NestedMetadata
146+
if err := json.Unmarshal([]byte(update), &u); err != nil {
147+
return ioterr.New(err, "unmarshaling update")
148+
}
149+
return updateState(state, u)
150+
}
151+
124152
func updateState(state map[string]interface{}, update map[string]interface{}) error {
125153
if len(update) == 0 {
126154
for k := range state {

0 commit comments

Comments
 (0)
Please sign in to comment.