Skip to content

Commit

Permalink
jobs: implement IoT Jobs (#121)
Browse files Browse the repository at this point in the history
* Implement get/update
* Handle notify event
  • Loading branch information
at-wat authored Jul 20, 2020
1 parent fa54117 commit ceb8ae7
Show file tree
Hide file tree
Showing 8 changed files with 1,096 additions and 0 deletions.
4 changes: 4 additions & 0 deletions examples/jobs/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
jobs
root-CA.crt
certificate.pem.crt
private.pem.key
109 changes: 109 additions & 0 deletions examples/jobs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package main

import (
"context"
"fmt"
"os"
"time"

"github.com/at-wat/mqtt-go"
"github.com/seqsense/aws-iot-device-sdk-go/v4"
"github.com/seqsense/aws-iot-device-sdk-go/v4/jobs"
)

func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if len(os.Args) != 2 {
println("usage: jobs aws_iot_endpoint")
os.Exit(1)
}
host := os.Args[1]

for _, file := range []string{
"root-CA.crt",
"certificate.pem.crt",
"private.pem.key",
} {
_, err := os.Stat(file)
if os.IsNotExist(err) {
println(file, "not found")
os.Exit(1)
}
}

cli, err := awsiotdev.New(
"sample",
&mqtt.URLDialer{
URL: fmt.Sprintf("mqtts://%s:8883", host),
Options: []mqtt.DialOption{
mqtt.WithTLSCertFiles(
host,
"root-CA.crt",
"certificate.pem.crt",
"private.pem.key",
),
mqtt.WithConnStateHandler(func(s mqtt.ConnState, err error) {
fmt.Printf("%s: %v\n", s, err)
}),
},
},
mqtt.WithReconnectWait(500*time.Millisecond, 2*time.Second),
)
if err != nil {
panic(err)
}
cli.Handle(mqtt.HandlerFunc(func(m *mqtt.Message) {
fmt.Printf("message dropped: %v\n", *m)
}))

if _, err := cli.Connect(ctx,
"sample",
mqtt.WithKeepAlive(30),
); err != nil {
panic(err)
}

j, err := jobs.New(ctx, cli)
if err != nil {
panic(err)
}
j.OnError(func(err error) {
fmt.Printf("async error: %v\n", err)
})
cli.Handle(j)

processJob := func(jbs map[jobs.JobExecutionState][]jobs.JobExecutionSummary) {
if q, ok := jbs[jobs.Queued]; ok && len(q) > 0 {
fmt.Printf("> get job detail of %s\n", q[0].JobID)
jb, err := j.DescribeJob(ctx, q[0].JobID)
if err != nil {
fmt.Printf("describe job error: %v\n", err)
} else {
fmt.Printf("described: %+v\n", *jb)
}
fmt.Print("> update job status to IN_PROGRESS\n")
if err := j.UpdateJob(ctx, jb, jobs.InProgress); err != nil {
fmt.Printf("update job error: %v\n", err)
}
} else {
fmt.Printf("no queued job\n")
}
}

j.OnJobChange(func(jbs map[jobs.JobExecutionState][]jobs.JobExecutionSummary) {
fmt.Printf("job changed: %+v\n", jbs)
processJob(jbs)
})

fmt.Print("> get pending jobs\n")
jbs, err := j.GetPendingJobs(ctx)
if err != nil {
panic(err)
}
fmt.Printf("jobs: %+v\n", jbs)
processJob(jbs)

select {}
}
42 changes: 42 additions & 0 deletions jobs/clienttoken.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2020 SEQSENSE, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jobs

import (
"reflect"
)

func clientToken(i interface{}) (string, bool) {
v := reflect.ValueOf(i).Elem().FieldByName("ClientToken")
if !v.IsValid() {
return "", false
}
if v.Kind() != reflect.String {
return "", false
}
return v.String(), true
}

func setClientToken(i interface{}, token string) bool {
v := reflect.ValueOf(i).Elem().FieldByName("ClientToken")
if !v.IsValid() {
return false
}
if v.Kind() != reflect.String {
return false
}
v.Set(reflect.ValueOf(token))
return true
}
20 changes: 20 additions & 0 deletions jobs/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2020 SEQSENSE, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jobs

import "errors"

// ErrInvalidResponse is returned if failed to parse response from AWS IoT.
var ErrInvalidResponse = errors.New("invalid response from AWS IoT")
Loading

0 comments on commit ceb8ae7

Please sign in to comment.