Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions agreementbot/consumer_protocol_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
return true, true, true
}

no_new_priority := true

// for every priority (in order highest to lowest) in the new policy with priority lower than the current wl
// if it's not in the old policy, cancel
choice := -1
Expand All @@ -474,9 +476,10 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
}

if currentWL := policy.GetWorkloadWithPriority(busPol.Workloads, wlUsagePriority); currentWL == nil {
// the current workload priority is no longer in the deployment policy
// the current workload priority is no longer in the deployment policy, need a service upgrade??
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("current workload priority %v is no longer in policy for agreement %v", wlUsagePriority, ag.CurrentAgreementId)))
return true, false, false
no_new_priority = false
//return true, false, false
} else {
wl = currentWL
}
Expand All @@ -487,7 +490,9 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
matchingWL := policy.GetWorkloadWithPriority(oldPolicy.Workloads, choice)
if matchingWL == nil || !matchingWL.IsSame(*nextPriority) {
glog.Infof(BCPHlogstring(b.Name(), fmt.Sprintf("Higher priority version added or modified. Cancelling agreement %v", ag.CurrentAgreementId)))
return true, false, false
no_new_priority = false
break
//return true, false, false
}
nextPriority = policy.GetNextWorkloadChoice(busPol.Workloads, choice)
}
Expand Down Expand Up @@ -535,15 +540,23 @@ func (b *BaseConsumerProtocolHandler) HandlePolicyChangeForAgreement(ag persiste
}
}

// will return here if business policy content doesn't change
updateType := basicprotocol.MsgUpdateTypePolicyChange

newTsCs, err := policy.Create_Terms_And_Conditions(producerPol, consumerPol, wl, ag.CurrentAgreementId, b.config.AgreementBot.DefaultWorkloadPW, b.config.AgreementBot.NoDataIntervalS, basicprotocol.PROTOCOL_CURRENT_VERSION)
if err != nil {
glog.Errorf(BCPHlogstring(b.Name(), fmt.Sprintf("error creating new terms and conditions: %v", err)))
return false, false, false
}

if !no_new_priority {
//newTsCs.Workloads = busPol.Workloads
updateType = basicprotocol.MsgUpdateTypeServiceUpgrade
}

ag.LastPolicyUpdateTime = uint64(time.Now().Unix())

b.UpdateAgreement(&ag, basicprotocol.MsgUpdateTypePolicyChange, newTsCs, cph)
b.UpdateAgreement(&ag, updateType, newTsCs, cph)

return true, true, true
}
Expand Down
11 changes: 7 additions & 4 deletions agreementbot/persistence/workload_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type WorkloadUsage struct {
DisableRetry bool `json:"disable_retry"` // when true, retry and retry durations are disbled which effectively disables workload rollback
VerifiedDurationS int `json:"verified_durations"` // the number of seconds for successful data verification before disabling workload rollback retries
ReqsNotMet bool `json:"requirements_not_met"` // this workload usage record is not at the highest priority because the device did not meet the API spec requirements at one of the higher priorities
UpgradeStategy string `json:"upgrade_strategy"` // The upgrade strategy to upgrade to this version
}

func (w WorkloadUsage) String() string {
Expand All @@ -37,9 +38,10 @@ func (w WorkloadUsage) String() string {
"DisableRetry: %v, "+
"VerifiedDurationS: %v, "+
"ReqsNotMet: %v, "+
"Policy: %v",
"Policy: %v, "+
"UpgradeStategy: %v",
w.Id, w.DeviceId, w.PendingUpgradeTime, w.PolicyName, w.Priority, w.RetryCount,
w.RetryDurationS, w.CurrentAgreementId, w.FirstTryTime, w.LatestRetryTime, w.DisableRetry, w.VerifiedDurationS, w.ReqsNotMet, w.Policy)
w.RetryDurationS, w.CurrentAgreementId, w.FirstTryTime, w.LatestRetryTime, w.DisableRetry, w.VerifiedDurationS, w.ReqsNotMet, w.Policy, w.UpgradeStategy)
}

func (w WorkloadUsage) ShortString() string {
Expand All @@ -55,9 +57,10 @@ func (w WorkloadUsage) ShortString() string {
"LatestRetryTime: %v, "+
"DisableRetry: %v, "+
"VerifiedDurationS: %v, "+
"ReqsNotMet: %v",
"ReqsNotMet: %v, "+
"UpgradeStategy: %v",
w.Id, w.DeviceId, w.PendingUpgradeTime, w.PolicyName, w.Priority, w.RetryCount,
w.RetryDurationS, w.CurrentAgreementId, w.FirstTryTime, w.LatestRetryTime, w.DisableRetry, w.VerifiedDurationS, w.ReqsNotMet)
w.RetryDurationS, w.CurrentAgreementId, w.FirstTryTime, w.LatestRetryTime, w.DisableRetry, w.VerifiedDurationS, w.ReqsNotMet, w.UpgradeStategy)
}

// private factory method for workloadusage w/out persistence safety:
Expand Down
9 changes: 9 additions & 0 deletions basicprotocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func NewBAgreementVerifyReply(bp *abstractprotocol.BaseProtocolMessage, exists b
// receipt of a rejection.
const MsgUpdateTypeSecret = "basicagreementupdatesecret"
const MsgUpdateTypePolicyChange = "basicagreementtupdatepolicychange"
const MsgUpdateTypeServiceUpgrade = "basicagreementupdateServiceUpgrade"

type BAgreementUpdate struct {
*abstractprotocol.BaseProtocolMessage
Expand Down Expand Up @@ -109,6 +110,10 @@ func (b *BAgreementUpdate) IsPolicyChangeUpdate() bool {
return b.Updatetype == MsgUpdateTypePolicyChange
}

func (b *BAgreementUpdate) IsServiceUpgrade() bool {
return b.Updatetype == MsgUpdateTypeServiceUpgrade
}

func (b *BAgreementUpdate) UpdateType() string {
return b.Updatetype
}
Expand Down Expand Up @@ -148,6 +153,10 @@ func (b *BAgreementUpdateReply) IsPolicyChangeUpdate() bool {
return b.Updatetype == MsgUpdateTypePolicyChange
}

func (b *BAgreementUpdateReply) IsServiceUpgrade() bool {
return b.Updatetype == MsgUpdateTypeServiceUpgrade
}

func (b *BAgreementUpdateReply) IsAccepted() bool {
return b.Accepted
}
Expand Down
6 changes: 4 additions & 2 deletions businesspolicy/business_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,14 @@ func (w WorkloadPriority) String() string {
type UpgradePolicy struct {
Lifecycle string `json:"lifecycle,omitempty"` // immediate, never, agreement
Time string `json:"time,omitempty"` // the time of the upgrade
Strategy string `json:"strategy,omitempty"` // rolling (start new, stop old), recreate (stop old, start new). Default is rolling.
}

func (w UpgradePolicy) String() string {
return fmt.Sprintf("Lifecycle: %v, Time: %v",
return fmt.Sprintf("Lifecycle: %v, Time: %v, Strategy: %v",
w.Lifecycle,
w.Time)
w.Time,
w.Strategy)
}

type WorkloadChoice struct {
Expand Down
40 changes: 40 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ const (

// Service related
SERVICE_CONFIG_STATE_CHANGED EventId = "SERVICE_CONFIG_STATE_CHANGED"
SERVICE_VERSION_UPGRADE EventId = "SERVICE_VERSION_UPGRADE"

// Object Policy related
OBJECT_POLICY_NEW EventId = "OBJECT_POLICY_NEW"
Expand Down Expand Up @@ -2216,6 +2217,45 @@ func NewWorkloadUpdateMessage(id EventId, agreementId string, protocol string, c
}
}

type WorkloadUpgradeMessage struct {
event Event
launchContext *AgreementLaunchContext
// AgreementProtocol string
// AgreementId string
// Deployment persistence.DeploymentConfig
// WorkloadUpdate policy.Workload
}

func (w *WorkloadUpgradeMessage) Event() Event {
return w.event
}

func (w *WorkloadUpgradeMessage) String() string {
//depStr := ""
// if w.Deployment != nil {
// depStr = w.Deployment.ToString()
// }

//return fmt.Sprintf("event: %v, AgreementProtocol: %v, agreementId: %v, deployment: %v, workloadUpdate: %v", w.event, w.AgreementProtocol, w.AgreementId, depStr, w.WorkloadUpdate)
return ""
}

func (w *WorkloadUpgradeMessage) ShortString() string {
return w.String()
}

func NewWorkloadUpgradeMessage(id EventId, agreementId string, protocol string, deployment persistence.DeploymentConfig, workloadUpdate policy.Workload) *WorkloadUpgradeMessage {
return &WorkloadUpgradeMessage{
event: Event{
Id: id,
},
// AgreementId: agreementId,
// AgreementProtocol: protocol,
// Deployment: deployment,
// WorkloadUpdate: workloadUpdate,
}
}

type NMPStartDownloadMessage struct {
event Event
Message StartDownloadMessage
Expand Down
120 changes: 117 additions & 3 deletions governance/governance.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ func (w *GovernanceWorker) CommandHandler(command worker.Command) bool {
// See if it is a proposal first. If it is, ignore it... If not, check if message still exists and then check for the message types this modules cares about
if _, err := protocolHandler.ValidateProposal(protocolMsg); err == nil {
// Gets in here if it is a proposal
glog.V(5).Infof(logString(fmt.Sprintf("Governance handler ignoring proposal message")))
glog.V(5).Infof(logString(fmt.Sprintf("Governance handler ignoring proposal message"))) //???
} else if there, err := w.messageInExchange(exchangeMsg.MsgId); err != nil {
glog.Errorf(logString(fmt.Sprintf("unable to get messages from the exchange, error %v", err)))
w.AddDeferredCommand(cmd)
Expand Down Expand Up @@ -1113,7 +1113,7 @@ func (w *GovernanceWorker) CommandHandler(command worker.Command) bool {
} else {

// Allow the message extension handler to see the message
handled, cancel, agid, updatedSecs, err := w.producerPH[msgProtocol].HandleExtensionMessages(&cmd.Msg, exchangeMsg)
handled, cancel, agid, updatedSecs, updatedWorkload, err := w.producerPH[msgProtocol].HandleExtensionMessages(&cmd.Msg, exchangeMsg)
if err != nil {
glog.Errorf(logString(fmt.Sprintf("unable to handle message %v , error: %v", protocolMsg, err)))
} else if cancel {
Expand Down Expand Up @@ -1167,11 +1167,102 @@ func (w *GovernanceWorker) CommandHandler(command worker.Command) bool {
if len(updatedSecs) != 0 {
clusterNamespaceInAg, err := w.GetRequestedClusterNamespaceFromAg(&ags[0])
if err != nil {
glog.Errorf(logString(fmt.Sprintf("Failed to get cluster namespace from agreeent %v. %v", ags[0].CurrentAgreementId, err)))
glog.Errorf(logString(fmt.Sprintf("Failed to get cluster namespace from agreement %v. %v", ags[0].CurrentAgreementId, err)))
}
// have updatedSecs, send out an event to let kube worker know about the secret update
w.Messages() <- events.NewWorkloadUpdateMessage(events.UPDATE_SECRETS_IN_AGREEMENT, agid, msgProtocol, clusterNamespaceInAg, ags[0].GetDeploymentConfig(), updatedSecs)
}

if updatedWorkload != nil {
// need to set up agreementLaunchContext()
clusterNamespaceInAg, err := w.GetRequestedClusterNamespaceFromAg(&ags[0])
if err != nil {
glog.Errorf(logString(fmt.Sprintf("Failed to get cluster namespace from agreement %v. %v", ags[0].CurrentAgreementId, err)))
}

// get service image auths from the exchange
img_auths, err := w.GetImageAuths(updatedWorkload)
if err != nil {
glog.Errorf(logString(fmt.Sprintf("Failed to get image auth for workload %v in agreement. %v", updatedWorkload, ags[0].CurrentAgreementId, err)))
}

cc := events.NewContainerConfig(updatedWorkload.Deployment, updatedWorkload.DeploymentSignature, updatedWorkload.DeploymentUserInfo,
updatedWorkload.ClusterDeployment, updatedWorkload.ClusterDeploymentSignature, clusterNamespaceInAg, updatedWorkload.DeploymentOverrides, img_auths)

lc := new(events.AgreementLaunchContext)
lc.Configure = *cc
lc.AgreementId = ags[0].CurrentAgreementId
lc.AgreementProtocol = ags[0].AgreementProtocol

// get environmental settings for the workload

// The service config variables are stored in the device's attributes.
if proposal, err := protocolHandler.DemarshalProposal(ags[0].Proposal); err != nil {

} else if tcPolicy, err := policy.DemarshalPolicy(proposal.TsAndCs()); err != nil {

} else {
envAdds, err := w.GetServicePreference(updatedWorkload.WorkloadURL, updatedWorkload.Org, tcPolicy)
if err != nil {
glog.Errorf(logString(fmt.Sprintf("Error getting environment variables from node settings for %v %v: %v", updatedWorkload.WorkloadURL, updatedWorkload.Org, err)))
}

// The workload config we have might be from a lower version of the workload. Go to the exchange and
// get the metadata for the version we are running and then add in any unset default user inputs.
var serviceDef *exchange.ServiceDefinition
serviceId := ""
if _, sDef, allIDs, err := exchange.GetHTTPServiceResolverHandler(w)(updatedWorkload.WorkloadURL, updatedWorkload.Org, updatedWorkload.Version, updatedWorkload.Arch); err != nil {
//return fmt.Errorf(logString(fmt.Sprintf("Received error querying exchange for service metadata: %v/%v, error %v", workload.Org, workload.WorkloadURL, err)))
} else if sDef == nil {
//return fmt.Errorf(logString(fmt.Sprintf("Cound not find service metadata for %v/%v.", workload.Org, workload.WorkloadURL)))
} else {
serviceDef = sDef
sDef.PopulateDefaultUserInput(envAdds)
if allIDs != nil && len(allIDs) > 0 {
serviceId = allIDs[0]
}
}

// create microservice def for this agreement
var msdef *persistence.MicroserviceDefinition
msFilters := []persistence.MSFilter{persistence.UrlOrgVersionMSFilter(serviceDef.URL, exchange.GetOrg(serviceId), serviceDef.Version), persistence.UnarchivedMSFilter()}
if msdefs, err := persistence.FindMicroserviceDefs(w.db, msFilters); err != nil {
//return fmt.Errorf(logString(fmt.Sprintf("Error finding service definition from the local db for %v. %v", serviceId, err)))
} else if msdefs == nil || len(msdefs) == 0 {
vExp, err1 := semanticversion.Version_Expression_Factory(serviceDef.Version)
if err1 != nil {
//return fmt.Errorf("VersionRange %v cannot be converted to a version expression, error %v", serviceDef.Version, err1)
}

if msdef, err = microservice.CreateMicroserviceDefWithServiceDef(w.db, serviceDef, serviceId, vExp.Get_expression()); err != nil {
//return fmt.Errorf(logString(fmt.Sprintf("failed to create service definition for %v for agreement %v: %v", serviceId, proposal.AgreementId(), err)))
}
} else {
msdef = &msdefs[0]
}

cutil.SetPlatformEnvvars(envAdds,
config.ENVVAR_PREFIX,
proposal.AgreementId(),
exchange.GetId(w.GetExchangeId()),
exchange.GetOrg(w.GetExchangeId()),
w.GetExchangeURL(),
w.devicePattern,
w.BaseWorker.Manager.Config.GetFileSyncServiceProtocol(),
w.BaseWorker.Manager.Config.GetFileSyncServiceAPIListen(),
strconv.Itoa(int(w.BaseWorker.Manager.Config.GetFileSyncServiceAPIPort())))

lc.EnvironmentAdditions = &envAdds

if err := w.processServiceSecrets(tcPolicy, proposal.AgreementId()); err != nil {

}

w.Messages() <- events.NewWorkloadUpgradeMessage(events.SERVICE_VERSION_UPGRADE, agid, msgProtocol, ags[0].GetDeploymentConfig(), *updatedWorkload)

}

}
}
}

Expand Down Expand Up @@ -2144,3 +2235,26 @@ func (w *GovernanceWorker) GetRequestedClusterNamespaceFromAg(ag *persistence.Es
return tcPolicy.ClusterNamespace, nil
}
}

func (w *GovernanceWorker) GetImageAuths(workload *policy.Workload) ([]events.ImageDockerAuth, error) {
img_auths := make([]events.ImageDockerAuth, 0)
if w.deviceType == persistence.DEVICE_TYPE_DEVICE {
if w.Config.Edge.TrustDockerAuthFromOrg {
if ias, err := exchange.GetHTTPServiceDockerAuthsHandler(w)(workload.WorkloadURL, workload.Org, workload.Version, workload.Arch); err != nil {
return img_auths, errors.New(logString(fmt.Sprintf("received error querying exchange for service image auths: %v, error %v", workload, err)))
} else {
if ias != nil {
for _, iau_temp := range ias {
username := iau_temp.UserName
if username == "" {
username = "token"
}
img_auths = append(img_auths, events.ImageDockerAuth{Registry: iau_temp.Registry, UserName: username, Password: iau_temp.Token})
}
}
}
}
}

return img_auths, nil
}
17 changes: 15 additions & 2 deletions governance/microservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,21 @@ func (w *GovernanceWorker) handleMicroserviceUpgrade(msdef_id string) {
persistence.NewMessageMeta(EL_GOV_ERR_RETRIEVE_SDEFS_FROM_DB, msdef_id, err.Error()),
persistence.EC_DATABASE_ERROR)
glog.Errorf(logString(fmt.Sprintf("error getting service definitions %v from db. %v", msdef_id, err)))
} else if microservice.MicroserviceReadyForUpgrade(msdef, w.db) {
// find the new ms def to upgrade to
} else {

}


else if microservice.MicroserviceReadyForUpgrade(msdef, w.db) { // this is only for dependent service upgrade
// case 1: dependent service: call microservice.MicroserviceReadyForUpgrade(msdef, w.db): find the new ms def to upgrade to

// case 2: top level service: get the version need to be upgrade to from bp?
// 1. get 1) upgrade policy 2) version to upgrade from bp or from microserviceDef
// 2. call UpgradeMicroservice()
// 1) stop old, start new -> like today
// 2) start new, record start time. subworker keep checking runningDuration = currentTime - startTime for all microserviceDefs which upgradeStarting is true. Once runningDuration > new_service_up_durations, stop old service
// 3) Once finish upgrade, set upgradeStarting to false

if new_msdef, err := microservice.GetUpgradeMicroserviceDef(exchange.GetHTTPServiceResolverHandler(w.limitedRetryEC), msdef, w.db); err != nil {
glog.Errorf(logString(fmt.Sprintf("Error finding the new service definition to upgrade to for %v/%v version %v. %v", msdef.Org, msdef.SpecRef, msdef.Version, err)))
} else if new_msdef == nil {
Expand Down
5 changes: 5 additions & 0 deletions imagefetch/image_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (w *ImageFetchWorker) NewEvent(incoming events.Message) {
fCmd := w.NewFetchCommand(msg.LaunchContext())
w.Commands <- fCmd

case *events.WorkloadUpgradeMessage:
msg, _ := incoming.(*events.WorkloadUpgradeMessage)

fCmd := w.NewFetchCommand(msg.LaunchContext())

case *events.LoadContainerMessage:
msg, _ := incoming.(*events.LoadContainerMessage)

Expand Down
2 changes: 1 addition & 1 deletion microservice/microservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func MicroserviceReadyForUpgrade(msdef *persistence.MicroserviceDefinition, db *
if msi.IsAgreementLess() {
return false
// never upgrade top level services, they are controlled by the agbot.
} else if msi.IsTopLevelService() {
} else if msi.IsTopLevelService() { // Need to remove this check
return false
} else if !msdef.ActiveUpgrade && msi.GetServiceDefId() == msdef.Id && msi.GetCleanupStartTime() == 0 {
// If the service can only be upgraded when there are no agreements, check for agreements.
Expand Down
Loading