Skip to content

Commit

Permalink
wip: porter strategy
Browse files Browse the repository at this point in the history
Signed-off-by: Carolyn Van Slyck <[email protected]>
  • Loading branch information
carolynvs committed Nov 14, 2022
1 parent 89f0c59 commit bb1397e
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 27 deletions.
46 changes: 44 additions & 2 deletions pkg/cnab/extensions/dependencies/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ type DependencySource struct {
Output string `json:"output,omitempty" mapstructure:"output,omitempty"`
}

var dependencySourceWiringRegex = regexp.MustCompile(`bundle(\.dependencies\.([^.]+))?\.([^.]+)\.(.+)`)

// ParseDependencySource identifies the components specified in a wiring string.
func ParseDependencySource(value string) (DependencySource, error) {
// TODO(PEP003): At build time, check if a dependency source was defined with templating and error out
// e.g. ${bundle.parameters.foo} should be bundle.parameters.foo

regex := regexp.MustCompile(`bundle(\.dependencies\.([^.]+))?\.([^.]+)\.(.+)`)
matches := regex.FindStringSubmatch(value)
matches := dependencySourceWiringRegex.FindStringSubmatch(value)

// If it doesn't match our wiring syntax, assume that it is a hard coded value
if matches == nil || len(matches) < 5 {
Expand Down Expand Up @@ -162,6 +163,47 @@ func (s DependencySource) WiringSuffix() string {
return s.Value
}

type WorkflowWiring struct {
WorkflowID string
JobKey string
Parameter string
Credential string
Output string
}

var workflowWiringRegex = regexp.MustCompile(`workflow\.([^\.]+)\.jobs\.([^\.]+)\.([^\.]+)\.(.+)`)

func ParseWorkflowWiring(value string) (WorkflowWiring, error) {
matches := workflowWiringRegex.FindStringSubmatch(value)
if len(matches) < 5 {
return WorkflowWiring{}, fmt.Errorf("invalid workflow wiring was passed to the porter strategy, %s", value)
}

// the first group is the entire match, we don't care about it
workflowID := matches[1]
jobKey := matches[2]
dataType := matches[3] // e.g. parameters, credentials or outputs
dataKey := matches[4] // e.g. the name of the param/cred/output

wiring := WorkflowWiring{
WorkflowID: workflowID,
JobKey: jobKey,
}

switch dataType {
case "parameters":
wiring.Parameter = dataKey
case "credentials":
wiring.Credential = dataKey
case "outputs":
wiring.Output = dataKey
default:
return WorkflowWiring{}, fmt.Errorf("invalid workflow wiring was passed to the porter strategy, %s", value)
}

return wiring, nil
}

type DependencyInstallation struct {
Labels map[string]string `json:"labels,omitempty" mapstructure:"labels,omitempty"`
Criteria *InstallationCriteria `json:"criteria,omitempty" mapstructure:"criteria,omitempty"`
Expand Down
68 changes: 66 additions & 2 deletions pkg/cnab/extensions/dependencies/v2/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
func TestDependencySource(t *testing.T) {
t.Parallel()

jobKey := "1"
testcases := []struct {
name string
bundleWiring string
Expand Down Expand Up @@ -89,8 +90,71 @@ func TestDependencySource(t *testing.T) {
require.Equal(t, tc.bundleWiring, gotBundleWiring, "incorrect bundle wiring was returned")

// Check that we can convert to a workflow wiring form
gotWorkflowWiring := gotSource.AsWorkflowWiring("1")
require.Equal(t, tc.wantWorkflowWiring, gotWorkflowWiring, "incorrect workflow wiring was returned")
gotWorkflowWiringValue := gotSource.AsWorkflowWiring(jobKey)
require.Equal(t, tc.wantWorkflowWiring, gotWorkflowWiringValue, "incorrect workflow wiring string value was returned")
} else {
tests.RequireErrorContains(t, err, tc.wantErr)
}
})
}
}

func TestParseWorkflowWiring(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
wiringStr string
wantWorkflowWiring WorkflowWiring
wantErr string
}{
{ // Check that we can still pass hard-coded values in a workflow
name: "value not supported",
wiringStr: "11",
wantErr: "invalid workflow wiring",
},
{
name: "parameter",
wiringStr: "workflow.abc123.jobs.myjerb.parameters.logLevel",
wantWorkflowWiring: WorkflowWiring{
WorkflowID: "abc123",
JobKey: "myjerb",
Parameter: "logLevel",
},
},
{
name: "credential",
wiringStr: "workflow.myworkflow.jobs.root.credentials.kubeconfig",
wantWorkflowWiring: WorkflowWiring{
WorkflowID: "myworkflow",
JobKey: "root",
Credential: "kubeconfig",
},
},
{
name: "output",
wiringStr: "workflow.abc123.jobs.mydb.outputs.connstr",
wantWorkflowWiring: WorkflowWiring{
WorkflowID: "abc123",
JobKey: "mydb",
Output: "connstr",
},
},
{
name: "dependencies not allowed",
wiringStr: "workflow.abc123.jobs.root.dependencies.mydb.outputs.connstr",
wantErr: "invalid workflow wiring",
},
}

for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

gotWiring, err := ParseWorkflowWiring(tc.wiringStr)
if tc.wantErr == "" {
require.Equal(t, tc.wantWorkflowWiring, gotWiring, "incorrect WorkflowWiring was parsed")
} else {
tests.RequireErrorContains(t, err, tc.wantErr)
}
Expand Down
53 changes: 53 additions & 0 deletions pkg/porter/porter_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package porter

import (
"context"
"fmt"
"regexp"

"get.porter.sh/porter/pkg/storage"

v2 "get.porter.sh/porter/pkg/cnab/extensions/dependencies/v2"
"get.porter.sh/porter/pkg/tracing"
)

// PorterSecretStrategy knows how to resolve specially formatted wiring strings
// such as workflow.jobs.db.outputs.connstr from Porter instead of from a plugin.
// It is not written as a plugin because it is much more straightforward to
// retrieve the data already loaded in the running Porter instance than to start
// another one, load its config and requery the database.
type PorterSecretStrategy struct {
installations storage.InstallationProvider
}

// regular expression for parsing a workflow wiring string, such as workflow.jobs.db.outputs.connstr
var workflowWiringRegex = regexp.MustCompile(`workflow\.jobs\.([^\.]+)\.(.+)`)

func (s PorterSecretStrategy) Resolve(ctx context.Context, keyName string, keyValue string) (string, error) {
ctx, span := tracing.StartSpan(ctx)
defer span.EndSpan()

// TODO(PEP003): It would be great when we configure this strategy that we also do host, so that host secret resolution isn't deferred to the plugins
// i.e. we can configure a secret strategy and still be able to resolve directly in porter any host values.
if keyName != "porter" {
return "", fmt.Errorf("attempted to resolve secrets of type %s from the porter strategy", keyName)
}

wiring, err := v2.ParseWorkflowWiring(keyValue)
if err != nil {
return "", fmt.Errorf("invalid workflow wiring was passed to the porter strategy, %s", keyValue)
}

// TODO(PEP003): How do we want to re-resolve credentials passed to the root bundle? They aren't recorded so it's not a simple lookup
if wiring.Parameter != "" {
// TODO(PEP003): Resolve a parameter from another job that has not run yet
// 1. Find the workflow definition from the db (need a way to track "current" workflow)
// 2. Grab the job based on the jobid in the workflow wiring
// 3. First check the parameters field for the param, resolve just that if available, otherwise resolve parameter sets and get it from there
// it sure would help if we remembered what params are in each set
} else if wiring.Output != "" {
// TODO(PEP003): Resolve the output from an already executed job
}

panic("not implemented")
}
15 changes: 8 additions & 7 deletions pkg/porter/workflow_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t Engine) CreateWorkflow(ctx context.Context, opts CreateWorkflowOptions)
}

// Now build job definitions for each node in the graph
jobs := make(map[string]storage.Job, len(nodes))
jobs := make(map[string]*storage.Job, len(nodes))
for _, node := range nodes {
switch tn := node.(type) {
case depsv2.BundleNode:
Expand Down Expand Up @@ -117,7 +117,7 @@ func (t Engine) CreateWorkflow(ctx context.Context, opts CreateWorkflowOptions)
}
}

jobs[tn.Key] = storage.Job{
jobs[tn.Key] = &storage.Job{
Action: cnab.ActionInstall, // TODO(PEP003): eventually this needs to support all actions
Installation: inst,
Depends: requiredJobs,
Expand Down Expand Up @@ -211,8 +211,8 @@ func (t Engine) executeStage(ctx context.Context, w storage.Workflow, stageIndex
return fmt.Errorf("could not sort jobs in stage")
}

availableJobs := make(chan storage.Job, len(s.Jobs))
completedJobs := make(chan storage.Job, len(s.Jobs))
availableJobs := make(chan *storage.Job, len(s.Jobs))
completedJobs := make(chan *storage.Job, len(s.Jobs))

// Default the number of parallel jobs to the number of CPUs
// This gives us 1 CPU per invocation image.
Expand All @@ -235,6 +235,7 @@ func (t Engine) executeStage(ctx context.Context, w storage.Workflow, stageIndex
return
case job := <-availableJobs:
jobsInProgress.Go(func() error {
// TODO(PEP003) why do we have to look this up again?
return t.executeJob(ctx, s.Jobs[job.Key], completedJobs)
})
}
Expand Down Expand Up @@ -283,15 +284,15 @@ func (t Engine) executeStage(ctx context.Context, w storage.Workflow, stageIndex
return err
}

func (t Engine) queueAvailableJobs(ctx context.Context, s storage.Stage, sortedNodes []depsv2.Node, availableJobs chan storage.Job) bool {
func (t Engine) queueAvailableJobs(ctx context.Context, s storage.Stage, sortedNodes []depsv2.Node, availableJobs chan *storage.Job) bool {
// Walk through the graph in sorted order (bottom up)
// if the node's dependencies are all successful, schedule it
// as soon as it's not schedule, stop looking because none of the remainder will be either
var i int
for i = 0; i < len(sortedNodes); i++ {
node := sortedNodes[i]

job := node.(storage.Job)
job := node.(*storage.Job)
switch job.Status.Status {
case cnab.StatusFailed:
// stop scheduling more jobs
Expand Down Expand Up @@ -320,7 +321,7 @@ func (t Engine) queueAvailableJobs(ctx context.Context, s storage.Stage, sortedN
return i >= len(sortedNodes)
}

func (t Engine) executeJob(ctx context.Context, j storage.Job, jobs chan storage.Job) error {
func (t Engine) executeJob(ctx context.Context, j *storage.Job, jobs chan *storage.Job) error {
ctx, span := tracing.StartSpan(ctx, tracing.ObjectAttribute("job", j))
defer span.EndSpan()

Expand Down
6 changes: 6 additions & 0 deletions pkg/secrets/plugin_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ func (a PluginAdapter) Close() error {
}

func (a PluginAdapter) Resolve(ctx context.Context, keyName string, keyValue string) (string, error) {
// Instead of calling out to a plugin, resolve the value from Porter's database
// This supports bundle workflows where we are sourcing data from other runs, e.g. passing a connection string from a dependency to another bundle
if keyName == "porter" {

}

return a.plugin.Resolve(ctx, keyName, keyValue)
}

Expand Down
41 changes: 31 additions & 10 deletions pkg/storage/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,38 @@ type WorkflowSpec struct {
MaxParallel int `json:"maxParallel"`

// DebugMode tweaks how the workflow is run to make it easier to debug
DebugMode bool `json:"DebugMode"`
DebugMode bool `json:"debugMode"`
}

// TODO(PEP003): Figure out what needs to be persisted, and how to persist multiple or continued runs
type WorkflowStatus struct {
}

// Prepare updates the internal data representation of the workflow before running it.
func (w Workflow) Prepare() {
for _, stage := range w.Stages {
for jobKey, job := range stage.Jobs {
job.Key = jobKey
stage.Jobs[jobKey] = job
}
func (w *Workflow) Prepare() {
// Assign an id to the workflow
w.ID = cnab.NewULID()

// Update any workflow wiring to use the workflow id?

for _, s := range w.Stages {
s.Prepare(w.ID)
}
}

// Stage represents a set of jobs that should run, possibly in parallel.
type Stage struct {
// Jobs is the set of bundles to execute, keyed by the job name.
Jobs map[string]Job `json:"jobs"`
Jobs map[string]*Job `json:"jobs"`
}

func (s *Stage) Prepare(workflowID string) {
// Update the jobs so that they know their job key (since they won't be used within the larger workflow, but as independent jobs)
for jobKey, job := range s.Jobs {
job.Prepare(workflowID, jobKey)
s.Jobs[jobKey] = job
}

}

// Job represents the execution of a bundle.
Expand All @@ -70,14 +81,24 @@ type Job struct {
Status JobStatus `json:"status,omitempty"`
}

func (j Job) GetRequires() []string {
func (j *Job) GetRequires() []string {
return j.Depends
}

func (j Job) GetKey() string {
func (j *Job) GetKey() string {
return j.Key
}

func (j *Job) Prepare(workflowId string, jobKey string) {
j.Key = jobKey
for _, param := range j.Installation.Parameters.Parameters {
if param.Source.Key != "porter" {
continue
}

}
}

type JobStatus struct {
Status string `json:"status"`
Message string `json:"message"`
Expand Down
12 changes: 6 additions & 6 deletions tests/integration/testdata/workflow/mybuns.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
schemaVersion: 1.0.0-alpha.1
name: "apply mybuns.yaml"
maxparallel: 1
debugmode: false
maxParallel: 1
debugMode: false
stages:
- jobs:
root:
action: install
installation:
schemaversion: 1.0.2
schemaVersion: 1.0.2
name: mybuns
namespace: dev
bundle:
Expand All @@ -28,7 +28,7 @@ stages:
root/db:
action: install
installation:
schemaversion: 1.0.2
schemaVersion: 1.0.2
name: mybuns/db
namespace: dev
bundle:
Expand All @@ -37,5 +37,5 @@ stages:
tag: v0.1.0
parameters:
- name: database
source:
value: bigdb
source:
value: bigdb

0 comments on commit bb1397e

Please sign in to comment.