Skip to content
This repository was archived by the owner on May 27, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions pkg/eaa/api_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package eaa

import (
"errors"
"net/http"

"github.com/gorilla/websocket"
)
Expand All @@ -17,60 +16,6 @@ var socket = websocket.Upgrader{
WriteBufferSize: 512,
}

// createWsConn creates a websocket connection for a consumer
// to receive data from subscribed producers
func createWsConn(w http.ResponseWriter, r *http.Request) (int, error) {
eaaCtx := r.Context().Value(contextKey("appliance-ctx")).(*Context)

// Get the consumer app ID from the Common Name in the certificate
commonName := r.TLS.PeerCertificates[0].Subject.CommonName

// Check if urn ID matches the Host included in the request header
if commonName != r.Host {
return http.StatusUnauthorized,
errors.New("401: Incorrect app ID")
}

eaaCtx.consumerConnections.Lock()
defer eaaCtx.consumerConnections.Unlock()

// Check if connection was created for urn ID, if so send close
// message, close the connection and delete the entry in the
// connections structure
foundConn, connFound := eaaCtx.consumerConnections.m[commonName]
if connFound {
prevConn := foundConn.connection
msgType := websocket.CloseMessage
closeMessage := websocket.FormatCloseMessage(
websocket.CloseServiceRestart,
"New connection request, closing this connection")
err := prevConn.WriteMessage(msgType, closeMessage)
if err != nil {
log.Info("Failed to send close message to old connection")
}
err = prevConn.Close()
if err != nil {
log.Info("Failed to close previous websocket connection")
}
delete(eaaCtx.consumerConnections.m, commonName)
}

// Create nil connection obj in consumerConnections map. That means the
// procedure of web socket connection has started.
eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{
connection: nil}
conn, err := socket.Upgrade(w, r, nil)
if err != nil {
delete(eaaCtx.consumerConnections.m, commonName)
return 0, err
}

eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{
connection: conn}

return 0, nil
}

// getConsumerSubscriptions returns a list of subscriptions belonging
// to the consumer
func getConsumerSubscriptions(commonName string,
Expand Down
132 changes: 131 additions & 1 deletion pkg/eaa/api_eaa.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
package eaa

import (
"bytes"
"encoding/json"
"net/http"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -61,6 +63,134 @@ func DeregisterApplication(w http.ResponseWriter, r *http.Request) {
commonName)
}

// HandleProducerNotifications is a go routine which continually anticipates
// notification messages from a Publisher application over the connected websocket
func HandleProducerNotifications(commonName string, connection *websocket.Conn, eaaCtx *Context) {
for {
// Read incoming message from websocket
_, wsMsg, err := connection.ReadMessage()
if err != nil {
log.Debug("Stopped reading messages")
return
}

// Check if application is registered as Producer
eaaCtx.serviceInfo.RLock()
_, serviceFound := eaaCtx.serviceInfo.m[commonName]
eaaCtx.serviceInfo.RUnlock()
if !serviceFound {
log.Err("Application not registered as a Producer")
continue
}

var notification NotificationFromProducer

// Attempt to parse message as JSON
msgBytes := bytes.NewReader(wsMsg)
err = json.NewDecoder(msgBytes).Decode(&notification)
if err != nil {
log.Errf("JSON parsing error in notification: %s", err.Error())
continue
}

// Generate URN structure
URN, err := CommonNameStringToURN(commonName)
if err != nil {
log.Errf("Error during URN generation: %s", err.Error())
continue
}

// Construct notification topic
notifTopic := getNotificationTopicName(URN.Namespace)

// Add a Publisher to the Notification Namespace topic (if not already)
err = eaaCtx.MsgBrokerCtx.addPublisher(notificationPublisher, notifTopic, nil)
if err != nil {
// Ignore objectAlreadyExistsError error
if _, ok := err.(objectAlreadyExistsError); !ok {
log.Errf("Error when adding a Publisher of type: '%v', id: '%v'. Error: %s",
notificationPublisher, notifTopic, err.Error())
continue
}
}

// Prepare a NotificationMessage that will be published via the message broker
notifMsg := NotificationMessage{Notification: &notification, URN: &URN}

// Create a Watermill Message
data, err := json.Marshal(notifMsg)
if err != nil {
log.Errf("Error during Service structure marshaling: %s", err.Error())
continue
}

// Publish message to the broker
msgToPublish := message.NewMessage(commonName, data)
err = eaaCtx.MsgBrokerCtx.publish(notifTopic, msgToPublish)
if err != nil {
log.Errf("Error while publishing message to the broker: %s", err.Error())
}
}
}

// CreateWebsocketConnection creates a bi-directional websocket for consumers
// to receive data from producers, and for producers to post notifications
// to subcribed consumers over a persisted websocket connection
func CreateWebsocketConnection(w http.ResponseWriter, r *http.Request) (int, error) {
eaaCtx := r.Context().Value(contextKey("appliance-ctx")).(*Context)

// Get the consumer app ID from the Common Name in the certificate
commonName := r.TLS.PeerCertificates[0].Subject.CommonName

// Check if urn ID matches the Host included in the request header
if commonName != r.Host {
return http.StatusUnauthorized,
errors.New("401: Incorrect app ID")
}

eaaCtx.consumerConnections.Lock()
defer eaaCtx.consumerConnections.Unlock()

// Check if connection was created for urn ID, if so send close
// message, close the connection and delete the entry in the
// connections structure
foundConn, connFound := eaaCtx.consumerConnections.m[commonName]
if connFound {
prevConn := foundConn.connection
msgType := websocket.CloseMessage
closeMessage := websocket.FormatCloseMessage(
websocket.CloseServiceRestart,
"New connection request, closing this connection")
err := prevConn.WriteMessage(msgType, closeMessage)
if err != nil {
log.Info("Failed to send close message to old connection")
}
err = prevConn.Close()
if err != nil {
log.Info("Failed to close previous websocket connection")
}
delete(eaaCtx.consumerConnections.m, commonName)
}

// Create nil connection obj in consumerConnections map. That means the
// procedure of web socket connection has started.
eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{
connection: nil}
conn, err := socket.Upgrade(w, r, nil)
if err != nil {
delete(eaaCtx.consumerConnections.m, commonName)
return 0, err
}

eaaCtx.consumerConnections.m[commonName] = ConsumerConnection{
connection: conn}

// Spawn a go routine to listen for notification messages from Producer apps
go HandleProducerNotifications(commonName, conn, eaaCtx)

return 0, nil
}

// GetNotifications implements https API
func GetNotifications(w http.ResponseWriter, r *http.Request) {
eaaCtx := r.Context().Value(contextKey("appliance-ctx")).(*Context)
Expand All @@ -72,7 +202,7 @@ func GetNotifications(w http.ResponseWriter, r *http.Request) {
}
eaaCtx.serviceInfo.RUnlock()

statCode, err := createWsConn(w, r)
statCode, err := CreateWebsocketConnection(w, r)
if err != nil {
log.Errf("Error in WebSocket Connection Creation: %#v", err)
if statCode != 0 {
Expand Down