Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some drawbacks for current kubeedge message transport mechanism #10

Open
GsssC opened this issue Feb 3, 2021 · 0 comments
Open

Some drawbacks for current kubeedge message transport mechanism #10

GsssC opened this issue Feb 3, 2021 · 0 comments

Comments

@GsssC
Copy link
Member

GsssC commented Feb 3, 2021

Message 机制剖析

剖析目前Message机制以便后续优化,在路由这块需要整理,首先观察目前Message的定义,先忽略Header,展开MessageRoute

type Message struct {
	Header  MessageHeader `json:"header"`
	Router  MessageRoute  `json:"route,omitempty"`
	Content interface{}   `json:"content"`
}

// MessageRoute contains structure of message
type MessageRoute struct {
	// 消息由哪个模块发出
	Source string `json:"source,omitempty"`
	// 消息属于哪个广播组
	Group string `json:"group,omitempty"`

	// 消息对Content的操作类型
	Operation string `json:"operation,omitempty"`
	// 消息Content的资源类型,在边缘和云端有两种格式
        // 边缘,kube-system/endpoints/kube-scheduller
        // 云端,node/edgenode-foo/kube-system/endpoints/kube-scheduller
	Resource string `json:"resource,omitempty"`
}
  • 消息定义中未含有DestinationModule,而是由消息发送时由函数参数给出,Send(module string, message model.Message),这里module == DestinationModule;消息定义中没有包含nodename,节点信息被压缩在Message.Resource当中
    MessageContext

    // MessageContext is interface for message syncing
    type MessageContext interface {
        // async mode
        Send(module string, message model.Message)
        Receive(module string) (model.Message, error)
        // sync mode
        SendSync(module string, message model.Message, timeout time.Duration) (model.Message, error)
        SendResp(message model.Message)
        // group broadcast
        SendToGroup(moduleType string, message model.Message)
        SendToGroupSync(moduleType string, message model.Message, timeout time.Duration) error
    }

    对于点对点传输,无大碍。但如果消息是跨多个模块的,就需要在途径的模块加入路由逻辑,以将消息接力发送至目的模块,这大大影响了代码可读性。例如Edge的MetaManager模块向Cloud的EdgeController模块发送Query消息,以get指定的configmap 或 secret, 单看原始的消息发送代码,或是日志打印出的信息,是非常难以理解的的:
    processRemoteQuery

    func (m *metaManager) processRemoteQuery(message model.Message) {
    go func() {
    	// TODO: retry
    	originalID := message.GetID()
    	message.UpdateID()
    	resp, err := beehiveContext.SendSync(
    		string(metaManagerConfig.Config.ContextSendModule), // 此参数指向cloudhub模块,
    		message,
    		time.Duration(metaManagerConfig.Config.RemoteQueryTimeout)*time.Second)
    	klog.Infof("########## process get: req[%+v], resp[%+v], err[%+v]", message, resp, err)
    ...

    这里,SendSync()函数的参数表示其发向cloudhub模块,开发者希望的消息完整传输链路是,MetaManager -> EdgeHub -> CloudHub -> EdgeController.UpstreamController,要理解上述代码,首先需要先知晓写在中间两个模块的路由逻辑:

    1. 所有发向EdgeHub的消息,都会上发到CloudHub
    2. CloudHub接收到Edge端消息,会根据消息Source,分发到不同模块

    Publish

    //cloud/pkg/cloudhub/channelq/channelq.go
    func (q *ChannelMessageQueue) Publish(msg *beehiveModel.Message) error {
    switch msg.Router.Source {
    
    case model.ResTwin:
    	beehiveContext.SendToGroup(model.SrcDeviceController, *msg)
    default:
    	beehiveContext.SendToGroup(model.SrcEdgeController, *msg)
    }
    return nil
    }

    由于Send(),SendToGroup的module参数是string,Message的Source字段也是string,很难借助某个变量,搜索引用而跳转到中间模块的路由逻辑,而必须肉眼追踪,才能知道此消息发向的最终目的模块,做何处理。例如表示edgecontroller模块名,值为"edgecotroller"的常量目前有:

    • SrcEdgeController
    • EdgeControllerModuleName
    • EdgeController
    • CloudControlerModule
    • ModuleNameController
    • 直接"edgecontroller"
      这些常量离散在不同的包中,无法统一。
  • Content类型是空接口interface{},大多数情况其结构体类型是[]byte
    Content的装载对象,一般情况下都是一个具体的k8d对象,例如v1.Pod,对于进程内部通信,直接通过断言来将空接口转换成结构体对象十分方便。但是实际使用上,KubeEdge的消息大多都是云边间的跨进程通信,消息经由网络,经历以下过程,Message(结构体对象) -> protobuf(Message,转换成protobuf消息对象) -> 省略 ->网络 -> 省略 ->protobuf(Message) -> Message(结构体对象),其中在转换成protobuf消息对象中,Content会被json编码成[]byte,存入protobuf.Content中。同样消息经由protobuf消息对象转换而来时,Content也是由protobuf.Content赋值而来,类型为[]byte。

    type Message struct {
    Header               *MessageHeader `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"`
    Router               *MessageRouter `protobuf:"bytes,2,opt,name=router,proto3" json:"router,omitempty"`
    Content              []byte         `protobuf:"bytes,3,opt,name=Content,proto3" json:"Content,omitempty"`
    XXX_NoUnkeyedLiteral struct{}       `json:"-"`
    XXX_unrecognized     []byte         `json:"-"`
    XXX_sizecache        int32          `json:"-"`
    }
    func (t *MessageTranslator) modelToProto(src *model.Message, dst *message.Message) error {
    dst.Header.ID = src.GetID()
    dst.Header.ParentID = src.GetParentID()
    dst.Header.Timestamp = int64(src.GetTimestamp())
    dst.Header.Sync = src.IsSync()
    dst.Router.Source = src.GetSource()
    dst.Router.Group = src.GetGroup()
    dst.Router.Resouce = src.GetResource()
    dst.Router.Operaion = src.GetOperation()
    if content := src.GetContent(); content != nil {
    	switch content.(type) {
    	case []byte:
    		dst.Content = content.([]byte)
    	case string:
    		dst.Content = []byte(content.(string))
    	default:
    		bytes, err := json.Marshal(content)
    		if err != nil {
    			klog.Error("failed to marshal")
    			return err
    		}
    		dst.Content = bytes
    	}
    }
    return nil
    }

    所以综上,但凡是跨云边传输的消息,接收端侧的Message.Content一定是[]byte类型。目前在Message.Content的使用上,也是默认其为[]byte类型,而后通过json.Unmarshal获取结构体对象。如果不是[]byte类型,则会强制使用json Marshal成[]byte,再json.Unmarshal。如此,考虑是否显式将Content类型定义为[]byte,而避免新开发者在此产生错误(尝试断言,而不是通过json.Unmarshal获取结构体对象)?

    func (uc *UpstreamController) updateNodeStatus() {
    		var data []byte
    		switch msg.Content.(type) {
    		case []byte:
    			data = msg.GetContent().([]byte)
    		default:
    			var err error
    			data, err = json.Marshal(msg.GetContent())
    			if err != nil {
    				klog.Warningf("message: %s process failure, marshal message content with error: %s", msg.GetID(), err)
    				continue
    			}
    		}
    ...
    			node := &v1.Node{}
    			err = json.Unmarshal(data, node)
    ...
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant