forked from TIBCOSoftware/flogo-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathactivity.go
executable file
·151 lines (117 loc) · 3.83 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package awsiot
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/TIBCOSoftware/flogo-lib/logger"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
// log is the default package logger
var log = logger.GetLogger("activity-tibco-rest")
const (
ivThingName = "thingName"
ivAwsEndpoint = "awsEndpoint"
ivDesired = "desired"
ivReported = "reported"
ovResult = "result"
)
// AwsIoT is an Activity that is used to update an Aws IoT device shadow
// inputs : {method,uri,params}
// outputs: {result}
type AwsIoT struct {
metadata *activity.Metadata
}
// NewActivity creates a new AwsIoT activity
func NewActivity(metadata *activity.Metadata) activity.Activity {
return &AwsIoT{metadata: metadata}
}
// Metadata returns the activity's metadata
func (a *AwsIoT) Metadata() *activity.Metadata {
return a.metadata
}
// Eval implements api.Activity.Eval - Invokes a Aws Iot Shadow Update
func (a *AwsIoT) Eval(context activity.Context) (done bool, err error) {
thingName := context.GetInput(ivThingName).(string)
awsEndpoint := context.GetInput(ivAwsEndpoint).(string)
req := &ShadowRequest{State: &ShadowState{}}
if context.GetInput(ivDesired) != nil {
desired := context.GetInput(ivDesired).(map[string]string)
req.State.Desired = desired
}
if context.GetInput(ivReported) != nil {
reported := context.GetInput(ivReported).(map[string]string)
req.State.Reported = reported
}
reqJSON, err := json.Marshal(req)
if err != nil {
return false, activity.NewError(err.Error(), "", nil)
}
log.Debugf("Shadow Request: %s", string(reqJSON))
brokerURI := fmt.Sprintf("ssl://%s:%d", awsEndpoint, 8883)
log.Debugf("Broker URI: %s", brokerURI)
tlsConfig := NewTLSConfig(thingName)
opts := MQTT.NewClientOptions()
opts.AddBroker(brokerURI)
opts.SetClientID(context.FlowDetails().ID())
opts.SetTLSConfig(tlsConfig)
// Start the connection
client := MQTT.NewClient(opts)
defer client.Disconnect(250)
token := client.Connect()
if token.Wait() && token.Error() != nil {
log.Errorf("Error connecting to '%s': %s", brokerURI, token.Error().Error())
return false, activity.NewError(token.Error().Error(), "", nil)
}
thingUpdate := fmt.Sprintf("$aws/things/%s/shadow/update", thingName)
Publish(client, thingUpdate, 1, string(reqJSON))
return true, nil
}
////////////////////////////////////////////////////////////////////////////////////////
// Utils
// Publish publishes a client message
func Publish(client MQTT.Client, topic string, qos int, input string) error {
token := client.Publish(topic, byte(qos), false, input)
if token.Wait() && token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
return nil
}
// NewTLSConfig creates a TLS configuration for the specified 'thing'
func NewTLSConfig(thingName string) *tls.Config {
// Import root CA
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile("things/root-CA.pem.crt")
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
thingDir := "things/" + thingName + "/"
// Import client certificate/key pair for the specified 'thing'
cert, err := tls.LoadX509KeyPair(thingDir+"device.pem.crt", thingDir+"device.pem.key")
if err != nil {
panic(err)
}
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{cert},
}
}
// ShadowRequest is a simple structure representing a Aws Shadow Update Request
type ShadowRequest struct {
State *ShadowState `json:"state"`
}
// ShadowState is the state to be updated
type ShadowState struct {
Desired map[string]string `json:"desired,omitempty"`
Reported map[string]string `json:"reported,omitempty"`
}