Skip to content

Commit

Permalink
Providing a Common MQTT Mapper
Browse files Browse the repository at this point in the history
Signed-off-by: Prometheus-collab <[email protected]>
  • Loading branch information
Prometheus-collab committed Sep 18, 2024
1 parent 1fe269d commit 95c0447
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 16 deletions.
14 changes: 6 additions & 8 deletions mappers/kubeedge-v1.17.0/mqtt-mapper/driver/devicetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,18 @@ type VisitorConfig struct {
type VisitorConfigData struct {
DataType string `json:"dataType"`

ClientID string `json:"clientID"` // MQTT Client ID
DeviceInfo string `json:"deviceInfo"` // Device information, such as device identification or other important information.
OperationInfo OperationInfoType `json:"operationInfo"` // Operation information, such as adding, deleting, modifying and so on.
SerializedFormat SerializedFormatType `json:"fileType"` // Supported formats: json, xml and yaml.
ParsedMessage interface{} `json:"parsedMessage"` // The parsed message
ClientID string `json:"clientID"` // MQTT Client ID
DeviceInfo string `json:"deviceInfo"` // Device information, such as device identification or other important information.
OperationInfo OperationInfoType `json:"operationInfo"` // Operation information, such as adding, deleting, modifying and so on.
SerializedFormat SerializedFormatType `json:"fileType"` // Supported formats: json, xml and yaml.
ParsedMessage map[string]interface{} `json:"parsedMessage"` // The parsed message
}

// OperationInfoType defines the enumeration values for device operation.
type OperationInfoType uint

const (
FULLTEXTMODIFY OperationInfoType = iota // full text revision
PATHMODIFY // path revision
VALUEMODIFY // value revision
UPDATE OperationInfoType = iota // revision
)

// SerializedFormatType defines the enumeration values for serialized types.
Expand Down
114 changes: 106 additions & 8 deletions mappers/kubeedge-v1.17.0/mqtt-mapper/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/xml"
"errors"
"fmt"
"reflect"
"strings"
"sync"

"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -132,9 +134,7 @@ func (c *ConfigData) GetMessage() (string, error) {

// OperationInfoType and SerializedFormatType mappings
var operationTypeMap = map[string]OperationInfoType{
"fulltextmodify": FULLTEXTMODIFY,
"pathmodify": PATHMODIFY,
"valuemodify": VALUEMODIFY,
"update": UPDATE,
}

var serializedFormatMap = map[string]SerializedFormatType{
Expand Down Expand Up @@ -177,7 +177,7 @@ func (c *ConfigData) SplitTopic() (string, OperationInfoType, SerializedFormatTy
// The function ParseMessage parses the Message field according to the incoming type.
// parseType(0: json, 1: yaml, 2: xml)
// The value interface{} represents the parsed structure.
func (c *ConfigData) ParseMessage(parseType SerializedFormatType) (interface{}, error) {
func (c *ConfigData) ParseMessage(parseType SerializedFormatType) (map[string]interface{}, error) {
if c.Message == "" {
return nil, errors.New("message is empty")
}
Expand All @@ -200,20 +200,28 @@ func (c *ConfigData) ParseMessage(parseType SerializedFormatType) (interface{},
return nil, err
}
c.Message = convertedMessage
return c.parseJSON()
originalMap, err := c.parseJSON()
var mp map[string]interface{}
for _, value := range originalMap {
if nestedMap, ok := value.(map[string]interface{}); ok {
mp = nestedMap
break
}
}
return mp, err

default:
return nil, errors.New("unsupported parse type")
}
}

// The function parseJSON parses the Message field of the ConfigData (assumed to be a JSON string).
func (c *ConfigData) parseJSON() (interface{}, error) {
func (c *ConfigData) parseJSON() (map[string]interface{}, error) {
if c.Message == "" {
return nil, errors.New("message is empty")
}

var result interface{}
var result map[string]interface{}
err := json.Unmarshal([]byte(c.Message), &result)
if err != nil {
return nil, err
Expand Down Expand Up @@ -267,6 +275,95 @@ func (c *ConfigData) NewVisitorConfigData() (*VisitorConfigData, error) {
}, nil
}

/* --------------------------------------------------------------------------------------- */
func (v *VisitorConfigData) ModifyVisitorConfigData(destDataConfig interface{}) error {
destValue := reflect.ValueOf(destDataConfig)
if destValue.Kind() != reflect.Ptr || destValue.Elem().Kind() != reflect.Struct {
return errors.New("destDataConfig must be a pointer to a struct")
}

destValue = destValue.Elem()

var tagName string
switch v.SerializedFormat {
case JSON:
tagName = "json"
case YAML:
tagName = "yaml"
case XML:
tagName = "xml"
default:
return errors.New("unknown serialized format")
}

// Update the destination struct using JSON tag
if err := updateStructFields(destValue, v.ParsedMessage, tagName); err != nil {
return err
}

return nil
}

// updateStructFields recursively updates struct fields from the given map using specified tag type
func updateStructFields(structValue reflect.Value, data map[string]interface{}, tagName string) error {
structType := structValue.Type()

for i := 0; i < structValue.NumField(); i++ {
field := structValue.Field(i)
fieldType := structType.Field(i)
tagValue := fieldType.Tag.Get(tagName)

if tagValue == "" {
// Skip fields without the specified tag
continue
}

// Get the corresponding value from the map
value, exists := data[tagValue]
if !exists {
continue
}

// Update the field based on its kind
if field.Kind() == reflect.Struct {
// Recursively update nested structs
nestedData, ok := value.(map[string]interface{})
if !ok {
return fmt.Errorf("type mismatch for nested field %s", tagValue)
}
if err := updateStructFields(field, nestedData, tagName); err != nil {
return err
}
} else if field.Kind() == reflect.Slice {
// Handle slices if necessary
sliceData, ok := value.([]interface{})
if !ok {
return fmt.Errorf("type mismatch for slice field %s", tagValue)
}
newSlice := reflect.MakeSlice(field.Type(), len(sliceData), len(sliceData))
for j, item := range sliceData {
itemValue := reflect.ValueOf(item)
if newSlice.Index(j).Kind() == itemValue.Kind() {
newSlice.Index(j).Set(itemValue)
} else {
return fmt.Errorf("type mismatch for slice item in field %s", tagValue)
}
}
field.Set(newSlice)
} else {
// Set the field value
fieldValue := reflect.ValueOf(value)
if field.Type() == fieldValue.Type() {
field.Set(fieldValue)
} else {
return fmt.Errorf("type mismatch for field %s", tagValue)
}
}
}
return nil
}


/* --------------------------------------------------------------------------------------- */
// The function ConvertYAMLToJSON converts a YAML string to a JSON string.
func convertYAMLToJSON(yamlString string) (string, error) {
Expand Down Expand Up @@ -325,7 +422,8 @@ func wrapXMLWithRoot(xmlString string) string {
}

// Wrap the remaining XML content with <root></root>
wrappedXML := "<root>" + xmlString + "</root>"
// wrappedXML := "<root>" + xmlString + "</root>"
wrappedXML := xmlString
return wrappedXML
}

Expand Down

0 comments on commit 95c0447

Please sign in to comment.