diff --git a/examples/jobs/.gitignore b/examples/jobs/.gitignore new file mode 100644 index 00000000..4724266b --- /dev/null +++ b/examples/jobs/.gitignore @@ -0,0 +1,4 @@ +jobs +root-CA.crt +certificate.pem.crt +private.pem.key diff --git a/examples/jobs/main.go b/examples/jobs/main.go new file mode 100644 index 00000000..f802f954 --- /dev/null +++ b/examples/jobs/main.go @@ -0,0 +1,109 @@ +package main + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/at-wat/mqtt-go" + "github.com/seqsense/aws-iot-device-sdk-go/v4" + "github.com/seqsense/aws-iot-device-sdk-go/v4/jobs" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if len(os.Args) != 2 { + println("usage: jobs aws_iot_endpoint") + os.Exit(1) + } + host := os.Args[1] + + for _, file := range []string{ + "root-CA.crt", + "certificate.pem.crt", + "private.pem.key", + } { + _, err := os.Stat(file) + if os.IsNotExist(err) { + println(file, "not found") + os.Exit(1) + } + } + + cli, err := awsiotdev.New( + "sample", + &mqtt.URLDialer{ + URL: fmt.Sprintf("mqtts://%s:8883", host), + Options: []mqtt.DialOption{ + mqtt.WithTLSCertFiles( + host, + "root-CA.crt", + "certificate.pem.crt", + "private.pem.key", + ), + mqtt.WithConnStateHandler(func(s mqtt.ConnState, err error) { + fmt.Printf("%s: %v\n", s, err) + }), + }, + }, + mqtt.WithReconnectWait(500*time.Millisecond, 2*time.Second), + ) + if err != nil { + panic(err) + } + cli.Handle(mqtt.HandlerFunc(func(m *mqtt.Message) { + fmt.Printf("message dropped: %v\n", *m) + })) + + if _, err := cli.Connect(ctx, + "sample", + mqtt.WithKeepAlive(30), + ); err != nil { + panic(err) + } + + j, err := jobs.New(ctx, cli) + if err != nil { + panic(err) + } + j.OnError(func(err error) { + fmt.Printf("async error: %v\n", err) + }) + cli.Handle(j) + + processJob := func(jbs map[jobs.JobExecutionState][]jobs.JobExecutionSummary) { + if q, ok := jbs[jobs.Queued]; ok && len(q) > 0 { + fmt.Printf("> get job detail of %s\n", q[0].JobID) + jb, err := j.DescribeJob(ctx, q[0].JobID) + if err != nil { + fmt.Printf("describe job error: %v\n", err) + } else { + fmt.Printf("described: %+v\n", *jb) + } + fmt.Print("> update job status to IN_PROGRESS\n") + if err := j.UpdateJob(ctx, jb, jobs.InProgress); err != nil { + fmt.Printf("update job error: %v\n", err) + } + } else { + fmt.Printf("no queued job\n") + } + } + + j.OnJobChange(func(jbs map[jobs.JobExecutionState][]jobs.JobExecutionSummary) { + fmt.Printf("job changed: %+v\n", jbs) + processJob(jbs) + }) + + fmt.Print("> get pending jobs\n") + jbs, err := j.GetPendingJobs(ctx) + if err != nil { + panic(err) + } + fmt.Printf("jobs: %+v\n", jbs) + processJob(jbs) + + select {} +} diff --git a/jobs/clienttoken.go b/jobs/clienttoken.go new file mode 100644 index 00000000..585c8d44 --- /dev/null +++ b/jobs/clienttoken.go @@ -0,0 +1,42 @@ +// Copyright 2020 SEQSENSE, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobs + +import ( + "reflect" +) + +func clientToken(i interface{}) (string, bool) { + v := reflect.ValueOf(i).Elem().FieldByName("ClientToken") + if !v.IsValid() { + return "", false + } + if v.Kind() != reflect.String { + return "", false + } + return v.String(), true +} + +func setClientToken(i interface{}, token string) bool { + v := reflect.ValueOf(i).Elem().FieldByName("ClientToken") + if !v.IsValid() { + return false + } + if v.Kind() != reflect.String { + return false + } + v.Set(reflect.ValueOf(token)) + return true +} diff --git a/jobs/error.go b/jobs/error.go new file mode 100644 index 00000000..0298d48c --- /dev/null +++ b/jobs/error.go @@ -0,0 +1,20 @@ +// Copyright 2020 SEQSENSE, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobs + +import "errors" + +// ErrInvalidResponse is returned if failed to parse response from AWS IoT. +var ErrInvalidResponse = errors.New("invalid response from AWS IoT") diff --git a/jobs/jobs.go b/jobs/jobs.go new file mode 100644 index 00000000..43b13a6b --- /dev/null +++ b/jobs/jobs.go @@ -0,0 +1,326 @@ +// Copyright 2020 SEQSENSE, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package jobs implements AWS IoT Jobs API. +package jobs + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "github.com/at-wat/mqtt-go" + "github.com/seqsense/aws-iot-device-sdk-go/v4" +) + +// Jobs is an interface of IoT Jobs. +type Jobs interface { + mqtt.Handler + // OnError sets handler of asynchronous errors. + OnError(func(error)) + // OnJobChange sets handler for job update. + OnJobChange(func(map[JobExecutionState][]JobExecutionSummary)) + // GetPendingJobs gets list of pending jobs. + GetPendingJobs(ctx context.Context) (map[JobExecutionState][]JobExecutionSummary, error) + // DescribeJob gets details of specific job. + DescribeJob(ctx context.Context, id string) (*JobExecution, error) + // UpdateJob updates job status. + UpdateJob(ctx context.Context, j *JobExecution, s JobExecutionState, opt ...UpdateJobOption) error +} + +type jobs struct { + mqtt.ServeMux + cli mqtt.Client + thingName string + mu sync.Mutex + chResps map[string]chan interface{} + onError func(err error) + onJobChange func(map[JobExecutionState][]JobExecutionSummary) + msgToken int +} + +func (j *jobs) token() string { + j.msgToken++ + return fmt.Sprintf("%x", j.msgToken) +} + +func (j *jobs) topic(operation string) string { + return "$aws/things/" + j.thingName + "/jobs/" + operation +} + +// New creates IoT Jobs interface. +func New(ctx context.Context, cli awsiotdev.Device) (Jobs, error) { + j := &jobs{ + cli: cli, + thingName: cli.ThingName(), + chResps: make(map[string]chan interface{}), + } + for _, sub := range []struct { + topic string + handler mqtt.Handler + }{ + {j.topic("notify"), mqtt.HandlerFunc(j.notify)}, + {j.topic("+/get/accepted"), mqtt.HandlerFunc(j.getJobAccepted)}, + {j.topic("+/get/rejected"), mqtt.HandlerFunc(j.rejected)}, + {j.topic("+/update/accepted"), mqtt.HandlerFunc(j.updateJobAccepted)}, + {j.topic("+/update/rejected"), mqtt.HandlerFunc(j.rejected)}, + {j.topic("get/accepted"), mqtt.HandlerFunc(j.getAccepted)}, + {j.topic("get/rejected"), mqtt.HandlerFunc(j.rejected)}, + } { + if err := j.ServeMux.Handle(sub.topic, sub.handler); err != nil { + return nil, err + } + } + + err := cli.Subscribe(ctx, + mqtt.Subscription{Topic: j.topic("notify"), QoS: mqtt.QoS1}, + mqtt.Subscription{Topic: j.topic("get/#"), QoS: mqtt.QoS1}, + mqtt.Subscription{Topic: j.topic("+/get/#"), QoS: mqtt.QoS1}, + ) + if err != nil { + return nil, err + } + return j, nil +} + +func (j *jobs) notify(msg *mqtt.Message) { + m := &jobExecutionsChangedMessage{} + if err := json.Unmarshal(msg.Payload, m); err != nil { + j.handleError(err) + return + } + j.mu.Lock() + cb := j.onJobChange + j.mu.Unlock() + + if cb != nil { + go cb(m.Jobs) + } +} + +func (j *jobs) GetPendingJobs(ctx context.Context) (map[JobExecutionState][]JobExecutionSummary, error) { + req := &simpleRequest{ClientToken: j.token()} + ch := make(chan interface{}, 1) + j.mu.Lock() + j.chResps[req.ClientToken] = ch + j.mu.Unlock() + defer func() { + j.mu.Lock() + delete(j.chResps, req.ClientToken) + j.mu.Unlock() + }() + + breq, err := json.Marshal(req) + if err != nil { + return nil, err + } + if err := j.cli.Publish(ctx, + &mqtt.Message{ + Topic: j.topic("get"), + QoS: mqtt.QoS1, + Payload: breq, + }, + ); err != nil { + return nil, err + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-ch: + switch r := res.(type) { + case *getPendingJobExecutionsResponse: + return map[JobExecutionState][]JobExecutionSummary{ + InProgress: r.InProgressJobs, + Queued: r.QueuedJobs, + }, nil + case *ErrorResponse: + return nil, r + default: + return nil, ErrInvalidResponse + } + } +} + +func (j *jobs) DescribeJob(ctx context.Context, id string) (*JobExecution, error) { + req := &describeJobExecutionRequest{ + IncludeJobDocument: true, + ClientToken: j.token(), + } + ch := make(chan interface{}, 1) + j.mu.Lock() + j.chResps[req.ClientToken] = ch + j.mu.Unlock() + defer func() { + j.mu.Lock() + delete(j.chResps, req.ClientToken) + j.mu.Unlock() + }() + + breq, err := json.Marshal(req) + if err != nil { + return nil, err + } + if err := j.cli.Publish(ctx, + &mqtt.Message{ + Topic: j.topic(id + "/get"), + QoS: mqtt.QoS1, + Payload: breq, + }, + ); err != nil { + return nil, err + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-ch: + switch r := res.(type) { + case *describeJobExecutionResponse: + return &r.Execution, nil + case *ErrorResponse: + return nil, r + default: + return nil, ErrInvalidResponse + } + } +} + +func (j *jobs) UpdateJob(ctx context.Context, je *JobExecution, s JobExecutionState, opt ...UpdateJobOption) error { + opts := &UpdateJobOptions{ + Details: make(map[string]string), + } + for _, o := range opt { + o(opts) + } + req := &updateJobExecutionRequest{ + Status: s, + StatusDetails: opts.Details, + ExpectedVersion: je.VersionNumber, + StepTimeoutInMinutes: opts.TimeoutMinutes, + ClientToken: j.token(), + } + ch := make(chan interface{}, 1) + j.mu.Lock() + j.chResps[req.ClientToken] = ch + j.mu.Unlock() + defer func() { + j.mu.Lock() + delete(j.chResps, req.ClientToken) + j.mu.Unlock() + }() + + breq, err := json.Marshal(req) + if err != nil { + return err + } + if err := j.cli.Publish(ctx, + &mqtt.Message{ + Topic: j.topic(je.JobID + "/update"), + QoS: mqtt.QoS1, + Payload: breq, + }, + ); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case res := <-ch: + switch r := res.(type) { + case *updateJobExecutionResponse: + return nil + case *ErrorResponse: + return r + default: + return ErrInvalidResponse + } + } +} + +func (j *jobs) handleResponse(r interface{}) { + token, ok := clientToken(r) + if !ok { + return + } + j.mu.Lock() + ch, ok := j.chResps[token] + j.mu.Unlock() + if !ok { + return + } + select { + case ch <- r: + default: + } +} + +func (j *jobs) getAccepted(msg *mqtt.Message) { + res := &getPendingJobExecutionsResponse{} + if err := json.Unmarshal(msg.Payload, res); err != nil { + j.handleError(err) + return + } + j.handleResponse(res) +} + +func (j *jobs) getJobAccepted(msg *mqtt.Message) { + res := &describeJobExecutionResponse{} + if err := json.Unmarshal(msg.Payload, res); err != nil { + j.handleError(err) + return + } + j.handleResponse(res) +} + +func (j *jobs) updateJobAccepted(msg *mqtt.Message) { + res := &updateJobExecutionResponse{} + if err := json.Unmarshal(msg.Payload, res); err != nil { + j.handleError(err) + return + } + j.handleResponse(res) +} + +func (j *jobs) rejected(msg *mqtt.Message) { + e := &ErrorResponse{} + if err := json.Unmarshal(msg.Payload, e); err != nil { + j.handleError(err) + return + } + j.handleResponse(e) +} + +func (j *jobs) OnError(cb func(err error)) { + j.mu.Lock() + j.onError = cb + j.mu.Unlock() +} + +func (j *jobs) handleError(err error) { + j.mu.Lock() + cb := j.onError + j.mu.Unlock() + if cb != nil { + cb(err) + } +} + +func (j *jobs) OnJobChange(cb func(map[JobExecutionState][]JobExecutionSummary)) { + j.mu.Lock() + j.onJobChange = cb + j.mu.Unlock() +} diff --git a/jobs/jobs_test.go b/jobs/jobs_test.go new file mode 100644 index 00000000..fa84798a --- /dev/null +++ b/jobs/jobs_test.go @@ -0,0 +1,439 @@ +// Copyright 2020 SEQSENSE, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobs + +import ( + "context" + "encoding/json" + "errors" + "reflect" + "testing" + "time" + + "github.com/at-wat/mqtt-go" + mockmqtt "github.com/at-wat/mqtt-go/mock" +) + +type mockDevice struct { + *mockmqtt.Client +} + +func (d *mockDevice) ThingName() string { + return "test" +} + +func TestNotify(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + cli := &mockDevice{&mockmqtt.Client{}} + j, err := New(ctx, cli) + if err != nil { + t.Fatal(err) + } + cli.Handle(j) + + expected := map[JobExecutionState][]JobExecutionSummary{ + Queued: []JobExecutionSummary{ + {JobID: "testID", VersionNumber: 1}, + }, + } + + done := make(chan struct{}) + j.OnJobChange(func(jbs map[JobExecutionState][]JobExecutionSummary) { + if !reflect.DeepEqual(expected, jbs) { + t.Fatalf("Expected jobs: %v, got: %v", expected, jbs) + } + close(done) + }) + + req := &jobExecutionsChangedMessage{ + Jobs: expected, + } + breq, err := json.Marshal(req) + if err != nil { + t.Fatal(err) + } + cli.Serve(&mqtt.Message{ + Topic: j.(*jobs).topic("notify"), + Payload: breq, + }) + + select { + case <-done: + case <-ctx.Done(): + t.Fatal("Timeout") + } +} + +func TestGetPendingJobs(t *testing.T) { + testCases := map[string]struct { + response interface{} + responseTopic string + expected interface{} + err error + }{ + "Success": { + response: &getPendingJobExecutionsResponse{ + InProgressJobs: []JobExecutionSummary{}, + QueuedJobs: []JobExecutionSummary{ + {JobID: "testID", VersionNumber: 1}, + }, + }, + responseTopic: "get/accepted", + expected: map[JobExecutionState][]JobExecutionSummary{ + InProgress: []JobExecutionSummary{}, + Queued: []JobExecutionSummary{ + {JobID: "testID", VersionNumber: 1}, + }, + }, + }, + "Error": { + response: &ErrorResponse{ + Code: "Failed", + Message: "Reason", + }, + responseTopic: "get/rejected", + err: &ErrorResponse{ + Code: "Failed", + Message: "Reason", + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + + var j Jobs + cli := &mockDevice{ + Client: &mockmqtt.Client{ + PublishFn: func(ctx context.Context, msg *mqtt.Message) error { + req := &simpleRequest{} + if err := json.Unmarshal(msg.Payload, req); err != nil { + t.Error(err) + cancel() + return err + } + res := testCase.response + setClientToken(res, req.ClientToken) + bres, err := json.Marshal(res) + if err != nil { + t.Error(err) + cancel() + return err + } + j.Serve(&mqtt.Message{ + Topic: j.(*jobs).topic(testCase.responseTopic), + Payload: bres, + }) + return nil + }, + }, + } + var err error + j, err = New(ctx, cli) + if err != nil { + t.Fatal(err) + } + cli.Handle(j) + + jbs, err := j.GetPendingJobs(ctx) + if err != nil { + setClientToken(err, "") + if !reflect.DeepEqual(testCase.err, err) { + t.Fatalf("Expected error: %v, got: %v", testCase.err, err) + } + } else { + if !reflect.DeepEqual(testCase.expected, jbs) { + t.Errorf("Expected jobs: %v, got: %v", testCase.expected, jbs) + } + } + }) + } +} + +func TestDescribeJob(t *testing.T) { + testCases := map[string]struct { + id string + expectedRequest interface{} + response interface{} + responseTopic string + expected interface{} + err error + }{ + "Success": { + id: "testID", + expectedRequest: &describeJobExecutionRequest{ + IncludeJobDocument: true, + }, + response: &describeJobExecutionResponse{ + Execution: JobExecution{ + JobID: "testID", + JobDocument: "doc", + StatusDetails: map[string]string{}, + }, + }, + responseTopic: "testID/get/accepted", + expected: &JobExecution{ + JobID: "testID", + JobDocument: "doc", + StatusDetails: map[string]string{}, + }, + }, + "Error": { + id: "testID", + expectedRequest: &describeJobExecutionRequest{ + IncludeJobDocument: true, + }, + response: &ErrorResponse{ + Code: "Failed", + Message: "Reason", + }, + responseTopic: "testID/get/rejected", + err: &ErrorResponse{ + Code: "Failed", + Message: "Reason", + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + + var j Jobs + cli := &mockDevice{ + Client: &mockmqtt.Client{ + PublishFn: func(ctx context.Context, msg *mqtt.Message) error { + req := &describeJobExecutionRequest{} + if err := json.Unmarshal(msg.Payload, req); err != nil { + t.Error(err) + cancel() + return err + } + clientToken := req.ClientToken + setClientToken(req, "") + if !reflect.DeepEqual(testCase.expectedRequest, req) { + t.Errorf("Expected request: %v, got: %v", testCase.expectedRequest, req) + cancel() + return errors.New("unexpected request") + } + res := testCase.response + setClientToken(res, clientToken) + bres, err := json.Marshal(res) + if err != nil { + t.Error(err) + cancel() + return err + } + j.Serve(&mqtt.Message{ + Topic: j.(*jobs).topic(testCase.responseTopic), + Payload: bres, + }) + return nil + }, + }, + } + var err error + j, err = New(ctx, cli) + if err != nil { + t.Fatal(err) + } + cli.Handle(j) + + jb, err := j.DescribeJob(ctx, testCase.id) + if err != nil { + setClientToken(err, "") + if !reflect.DeepEqual(testCase.err, err) { + t.Fatalf("Expected error: %v, got: %v", testCase.err, err) + } + } else { + if !reflect.DeepEqual(testCase.expected, jb) { + t.Errorf("Expected job detail: %v, got: %v", testCase.expected, jb) + } + } + }) + } +} + +func TestUpdateJob(t *testing.T) { + testCases := map[string]struct { + execution *JobExecution + status JobExecutionState + options []UpdateJobOption + expectedRequest interface{} + response interface{} + responseTopic string + err error + }{ + "Success/Queued": { + execution: &JobExecution{ + JobID: "testID", + JobDocument: "doc", + StatusDetails: map[string]string{}, + VersionNumber: 3, + }, + status: Queued, + expectedRequest: &updateJobExecutionRequest{ + Status: Queued, + ExpectedVersion: 3, + StatusDetails: map[string]string{}, + }, + response: &updateJobExecutionResponse{}, + responseTopic: "testID/update/accepted", + }, + "Success/Canceled": { + execution: &JobExecution{ + JobID: "testID", + JobDocument: "doc", + StatusDetails: map[string]string{}, + VersionNumber: 5, + }, + status: Canceled, + expectedRequest: &updateJobExecutionRequest{ + Status: Canceled, + ExpectedVersion: 5, + StatusDetails: map[string]string{}, + }, + response: &updateJobExecutionResponse{}, + responseTopic: "testID/update/accepted", + }, + "Success/WithTimeout": { + execution: &JobExecution{ + JobID: "testID", + JobDocument: "doc", + StatusDetails: map[string]string{}, + VersionNumber: 3, + }, + status: Queued, + options: []UpdateJobOption{ + WithTimeout(100), + }, + expectedRequest: &updateJobExecutionRequest{ + Status: Queued, + ExpectedVersion: 3, + StepTimeoutInMinutes: 100, + StatusDetails: map[string]string{}, + }, + response: &updateJobExecutionResponse{}, + responseTopic: "testID/update/accepted", + }, + "Success/WithDetails": { + execution: &JobExecution{ + JobID: "testID", + JobDocument: "doc", + StatusDetails: map[string]string{}, + VersionNumber: 3, + }, + status: Queued, + options: []UpdateJobOption{ + WithDetail("testKey1", "testValue1"), + WithDetail("testKey2", "testValue2"), + }, + expectedRequest: &updateJobExecutionRequest{ + Status: Queued, + ExpectedVersion: 3, + StatusDetails: map[string]string{ + "testKey1": "testValue1", + "testKey2": "testValue2", + }, + }, + response: &updateJobExecutionResponse{}, + responseTopic: "testID/update/accepted", + }, + + "Error": { + execution: &JobExecution{ + JobID: "testID", + JobDocument: "doc", + StatusDetails: map[string]string{}, + VersionNumber: 6, + }, + status: Queued, + expectedRequest: &updateJobExecutionRequest{ + Status: Queued, + ExpectedVersion: 6, + StatusDetails: map[string]string{}, + }, + response: &ErrorResponse{ + Code: "Failed", + Message: "Reason", + }, + responseTopic: "testID/update/rejected", + err: &ErrorResponse{ + Code: "Failed", + Message: "Reason", + }, + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + + var j Jobs + cli := &mockDevice{ + Client: &mockmqtt.Client{ + PublishFn: func(ctx context.Context, msg *mqtt.Message) error { + req := &updateJobExecutionRequest{} + if err := json.Unmarshal(msg.Payload, req); err != nil { + t.Error(err) + cancel() + return err + } + clientToken := req.ClientToken + setClientToken(req, "") + if !reflect.DeepEqual(testCase.expectedRequest, req) { + t.Errorf("Expected request: %v, got: %v", testCase.expectedRequest, req) + cancel() + return errors.New("unexpected request") + } + res := testCase.response + setClientToken(res, clientToken) + bres, err := json.Marshal(res) + if err != nil { + t.Error(err) + cancel() + return err + } + j.Serve(&mqtt.Message{ + Topic: j.(*jobs).topic(testCase.responseTopic), + Payload: bres, + }) + return nil + }, + }, + } + var err error + j, err = New(ctx, cli) + if err != nil { + t.Fatal(err) + } + cli.Handle(j) + + err = j.UpdateJob(ctx, testCase.execution, testCase.status, testCase.options...) + if err != nil { + setClientToken(err, "") + if !reflect.DeepEqual(testCase.err, err) { + t.Fatalf("Expected error: %v, got: %v", testCase.err, err) + } + } + }) + } +} diff --git a/jobs/option.go b/jobs/option.go new file mode 100644 index 00000000..1f513131 --- /dev/null +++ b/jobs/option.go @@ -0,0 +1,38 @@ +// Copyright 2020 SEQSENSE, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobs + +// UpdateJobOptions stores UpdateJob options. +type UpdateJobOptions struct { + TimeoutMinutes int + Details map[string]string +} + +// UpdateJobOption is a functional option of UpdateJob. +type UpdateJobOption func(*UpdateJobOptions) + +// WithTimeout sets UpdateJob timeout in minutes. +func WithTimeout(min int) UpdateJobOption { + return func(o *UpdateJobOptions) { + o.TimeoutMinutes = min + } +} + +// WithDetail adds detail status in key-value form. +func WithDetail(key, val string) UpdateJobOption { + return func(o *UpdateJobOptions) { + o.Details[key] = val + } +} diff --git a/jobs/type.go b/jobs/type.go new file mode 100644 index 00000000..6750d3b1 --- /dev/null +++ b/jobs/type.go @@ -0,0 +1,118 @@ +// Copyright 2020 SEQSENSE, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package jobs + +import ( + "fmt" +) + +// JobExecutionState represents job status. +type JobExecutionState string + +// JobExecutionState values. +const ( + Queued JobExecutionState = "QUEUED" + InProgress JobExecutionState = "IN_PROGRESS" + Failed JobExecutionState = "FAILED" + Succeeded JobExecutionState = "SUCCEEDED" + Canceled JobExecutionState = "CANCELED" + TimedOut JobExecutionState = "TIMED_OUT" + Rejected JobExecutionState = "REJECTED" + Removed JobExecutionState = "REMOVED" +) + +// JobExecutionSummary represents summary of a job. +type JobExecutionSummary struct { + JobID string `json:"jobId"` + QueuedAt int64 `json:"queuedAt"` + StartedAt int64 `json:"startedAt"` + LastUpdatedAt int64 `json:"lastUpdatedAt"` + VersionNumber int `json:"versionNumber"` + ExecutionNumber int `json:"executionNumber"` +} + +// JobExecution represents details of a job. +type JobExecution struct { + JobID string `json:"jobId"` + ThingName string `json:"thingName"` + JobDocument interface{} `json:"jobDocument"` + Status JobExecutionState `json:"status"` + StatusDetails map[string]string `json:"statusDetails"` + QueuedAt int64 `json:"queuedAt"` + StartedAt int64 `json:"startedAt"` + LastUpdatedAt int64 `json:"lastUpdatedAt"` + VersionNumber int `json:"versionNumber"` + ExecutionNumber int `json:"executionNumber"` +} + +// ErrorResponse represents error message from AWS IoT. +type ErrorResponse struct { + Code string `json:"code"` + Message string `json:"message"` + ClientToken string `json:"clientToken"` + Timestamp int64 `json:"timestamp"` + ExecutionState JobExecutionState `json:"executionState"` +} + +// Error implements error interface. +func (e *ErrorResponse) Error() string { + return fmt.Sprintf("%s (%s): %s", e.Code, e.ClientToken, e.Message) +} + +type jobExecutionsChangedMessage struct { + Jobs map[JobExecutionState][]JobExecutionSummary `json:"jobs"` + Timestamp int64 `json:"timestamp"` +} + +type simpleRequest struct { + ClientToken string `json:"clientToken"` +} + +type getPendingJobExecutionsResponse struct { + InProgressJobs []JobExecutionSummary `json:"inProgressJobs"` + QueuedJobs []JobExecutionSummary `json:"queuedJobs"` + Timestamp int64 `json:"timestamp"` + ClientToken string `json:"clientToken"` +} + +type describeJobExecutionRequest struct { + ExecutionNumber int `json:"executionNumber,omitempty"` + IncludeJobDocument bool `json:"includeJobDocument"` + ClientToken string `json:"clientToken"` +} + +type describeJobExecutionResponse struct { + Execution JobExecution `json:"execution"` + Timestamp int64 `json:"timestamp"` + ClientToken string `json:"clientToken"` +} + +type updateJobExecutionRequest struct { + Status JobExecutionState `json:"status"` + StatusDetails map[string]string `json:"statusDetails"` + ExpectedVersion int `json:"expectedVersion"` + ExecutionNumber int `json:"executionNumber,omitempty"` + IncludeJobExecutionState bool `json:"includeJobExecutionState,omitempty"` + IncludeJobDocument bool `json:"includeJobDocument,omitempty"` + StepTimeoutInMinutes int `json:"stepTimeoutInMinutes,omitempty"` + ClientToken string `json:"clientToken"` +} + +type updateJobExecutionResponse struct { + ExecutionState JobExecutionState `json:"executionState"` + JobDocument interface{} `json:"jobDocument"` + Timestamp int64 `json:"timestamp"` + ClientToken string `json:"clientToken"` +}