diff --git a/pkg/eaa/api_consumer.go b/pkg/eaa/api_consumer.go index 76bbd72..fec1272 100644 --- a/pkg/eaa/api_consumer.go +++ b/pkg/eaa/api_consumer.go @@ -5,7 +5,6 @@ package eaa import ( "errors" - "net/http" "github.com/gorilla/websocket" ) @@ -17,60 +16,6 @@ var socket = websocket.Upgrader{ WriteBufferSize: 512, } -// createWsConn creates a websocket connection for a consumer -// to receive data from subscribed producers -func createWsConn(w http.ResponseWriter, r *http.Request) (int, error) { - eaaCtx := r.Context().Value(contextKey("appliance-ctx")).(*Context) - - // Get the consumer app ID from the Common Name in the certificate - commonName := r.TLS.PeerCertificates[0].Subject.CommonName - - // Check if urn ID matches the Host included in the request header - if commonName != r.Host { - return http.StatusUnauthorized, - errors.New("401: Incorrect app ID") - } - - eaaCtx.consumerConnections.Lock() - defer eaaCtx.consumerConnections.Unlock() - - // Check if connection was created for urn ID, if so send close - // message, close the connection and delete the entry in the - // connections structure - foundConn, connFound := eaaCtx.consumerConnections.m[commonName] - if connFound { - prevConn := foundConn.connection - msgType := websocket.CloseMessage - closeMessage := websocket.FormatCloseMessage( - websocket.CloseServiceRestart, - "New connection request, closing this connection") - err := prevConn.WriteMessage(msgType, closeMessage) - if err != nil { - log.Info("Failed to send close message to old connection") - } - err = prevConn.Close() - if err != nil { - log.Info("Failed to close previous websocket connection") - } - delete(eaaCtx.consumerConnections.m, commonName) - } - - // Create nil connection obj in consumerConnections map. That means the - // procedure of web socket connection has started. - eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{ - connection: nil} - conn, err := socket.Upgrade(w, r, nil) - if err != nil { - delete(eaaCtx.consumerConnections.m, commonName) - return 0, err - } - - eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{ - connection: conn} - - return 0, nil -} - // getConsumerSubscriptions returns a list of subscriptions belonging // to the consumer func getConsumerSubscriptions(commonName string, diff --git a/pkg/eaa/api_eaa.go b/pkg/eaa/api_eaa.go index f0b3c5b..a124a9a 100644 --- a/pkg/eaa/api_eaa.go +++ b/pkg/eaa/api_eaa.go @@ -4,11 +4,13 @@ package eaa import ( + "bytes" "encoding/json" "net/http" "github.com/ThreeDotsLabs/watermill/message" "github.com/gorilla/mux" + "github.com/gorilla/websocket" "github.com/pkg/errors" ) @@ -61,6 +63,134 @@ func DeregisterApplication(w http.ResponseWriter, r *http.Request) { commonName) } +// HandleProducerNotifications is a go routine which continually anticipates +// notification messages from a Publisher application over the connected websocket +func HandleProducerNotifications(commonName string, connection *websocket.Conn, eaaCtx *Context) { + for { + // Read incoming message from websocket + _, wsMsg, err := connection.ReadMessage() + if err != nil { + log.Debug("Stopped reading messages") + return + } + + // Check if application is registered as Producer + eaaCtx.serviceInfo.RLock() + _, serviceFound := eaaCtx.serviceInfo.m[commonName] + eaaCtx.serviceInfo.RUnlock() + if !serviceFound { + log.Err("Application not registered as a Producer") + continue + } + + var notification NotificationFromProducer + + // Attempt to parse message as JSON + msgBytes := bytes.NewReader(wsMsg) + err = json.NewDecoder(msgBytes).Decode(¬ification) + if err != nil { + log.Errf("JSON parsing error in notification: %s", err.Error()) + continue + } + + // Generate URN structure + URN, err := CommonNameStringToURN(commonName) + if err != nil { + log.Errf("Error during URN generation: %s", err.Error()) + continue + } + + // Construct notification topic + notifTopic := getNotificationTopicName(URN.Namespace) + + // Add a Publisher to the Notification Namespace topic (if not already) + err = eaaCtx.MsgBrokerCtx.addPublisher(notificationPublisher, notifTopic, nil) + if err != nil { + // Ignore objectAlreadyExistsError error + if _, ok := err.(objectAlreadyExistsError); !ok { + log.Errf("Error when adding a Publisher of type: '%v', id: '%v'. Error: %s", + notificationPublisher, notifTopic, err.Error()) + continue + } + } + + // Prepare a NotificationMessage that will be published via the message broker + notifMsg := NotificationMessage{Notification: ¬ification, URN: &URN} + + // Create a Watermill Message + data, err := json.Marshal(notifMsg) + if err != nil { + log.Errf("Error during Service structure marshaling: %s", err.Error()) + continue + } + + // Publish message to the broker + msgToPublish := message.NewMessage(commonName, data) + err = eaaCtx.MsgBrokerCtx.publish(notifTopic, msgToPublish) + if err != nil { + log.Errf("Error while publishing message to the broker: %s", err.Error()) + } + } +} + +// CreateWebsocketConnection creates a bi-directional websocket for consumers +// to receive data from producers, and for producers to post notifications +// to subcribed consumers over a persisted websocket connection +func CreateWebsocketConnection(w http.ResponseWriter, r *http.Request) (int, error) { + eaaCtx := r.Context().Value(contextKey("appliance-ctx")).(*Context) + + // Get the consumer app ID from the Common Name in the certificate + commonName := r.TLS.PeerCertificates[0].Subject.CommonName + + // Check if urn ID matches the Host included in the request header + if commonName != r.Host { + return http.StatusUnauthorized, + errors.New("401: Incorrect app ID") + } + + eaaCtx.consumerConnections.Lock() + defer eaaCtx.consumerConnections.Unlock() + + // Check if connection was created for urn ID, if so send close + // message, close the connection and delete the entry in the + // connections structure + foundConn, connFound := eaaCtx.consumerConnections.m[commonName] + if connFound { + prevConn := foundConn.connection + msgType := websocket.CloseMessage + closeMessage := websocket.FormatCloseMessage( + websocket.CloseServiceRestart, + "New connection request, closing this connection") + err := prevConn.WriteMessage(msgType, closeMessage) + if err != nil { + log.Info("Failed to send close message to old connection") + } + err = prevConn.Close() + if err != nil { + log.Info("Failed to close previous websocket connection") + } + delete(eaaCtx.consumerConnections.m, commonName) + } + + // Create nil connection obj in consumerConnections map. That means the + // procedure of web socket connection has started. + eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{ + connection: nil} + conn, err := socket.Upgrade(w, r, nil) + if err != nil { + delete(eaaCtx.consumerConnections.m, commonName) + return 0, err + } + + eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{ + connection: conn} + + // Spawn a go routine to listen for notification messages from Producer apps + go HandleProducerNotifications(commonName, conn, eaaCtx) + + return 0, nil +} + // GetNotifications implements https API func GetNotifications(w http.ResponseWriter, r *http.Request) { eaaCtx := r.Context().Value(contextKey("appliance-ctx")).(*Context) @@ -72,7 +202,7 @@ func GetNotifications(w http.ResponseWriter, r *http.Request) { } eaaCtx.serviceInfo.RUnlock() - statCode, err := createWsConn(w, r) + statCode, err := CreateWebsocketConnection(w, r) if err != nil { log.Errf("Error in WebSocket Connection Creation: %#v", err) if statCode != 0 {