Skip to content

Commit

Permalink
collaboration capable workflow - first pass (#4336)
Browse files Browse the repository at this point in the history
Co-authored-by: Yohann Paris <[email protected]>
  • Loading branch information
mwdchang and YohannParis authored Aug 12, 2024
1 parent b2637fb commit 2086dd0
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<template #data>
<ContextMenu ref="contextMenu" :model="contextMenuItems" style="white-space: nowrap; width: auto" />
<tera-canvas-item
v-for="node in wf.nodes"
v-for="node in wf.nodes.filter((n) => n.isDeleted !== true)"
:key="node.id"
:style="{
width: `${node.width}px`,
Expand Down Expand Up @@ -135,12 +135,12 @@
fill="none"
/>
<path
v-for="edge of wf.edges"
v-for="edge of wf.edges.filter((e) => e.isDeleted !== true)"
:key="edge.id"
:d="drawPath(interpolatePointsForCurve(edge.points[0], edge.points[1]))"
stroke="#667085"
stroke-width="2"
marker-start="url(#circle)"
:key="edge.id"
fill="none"
/>
</template>
Expand Down Expand Up @@ -670,6 +670,7 @@ function removeEdges(portId: string) {
const nodeMap = new Map<WorkflowNode<any>['id'], WorkflowNode<any>>(wf.value.nodes.map((node) => [node.id, node]));
const nodeCache = new Map<WorkflowOutput<any>['id'], WorkflowNode<any>[]>();
wf.value.edges.forEach((edge) => {
if (edge.isDeleted === true) return;
if (!edge.source || !edge.target) return;
if (!nodeCache.has(edge.source)) nodeCache.set(edge.source, []);
nodeCache.get(edge.source)?.push(nodeMap.get(edge.target) as WorkflowNode<any>);
Expand Down
34 changes: 24 additions & 10 deletions packages/client/hmi-client/src/services/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export const addEdge = (
// Check if edge already exist
const existingEdge = wf.edges.find(
(d) =>
d.isDeleted !== true &&
d.source === sourceId &&
d.sourcePortId === sourcePortId &&
d.target === targetId &&
Expand Down Expand Up @@ -240,11 +241,17 @@ export const removeEdge = (wf: Workflow, id: string) => {
targetPort.type = targetPort.originalType;
}

// Tombstone
const edge = wf.edges.find((e) => e.id === id);
if (edge) {
edge.isDeleted = true;
}

// Edge re-assignment
wf.edges = wf.edges.filter((edge) => edge.id !== id);
// wf.edges = wf.edges.filter((edge) => edge.id !== id);

// If there are no more references reset the connected status of the source node
if (_.isEmpty(wf.edges.filter((e) => e.source === edgeToRemove.source))) {
if (_.isEmpty(wf.edges.filter((e) => e.isDeleted !== true && e.source === edgeToRemove.source))) {
const sourceNode = wf.nodes.find((d) => d.id === edgeToRemove.source);
if (!sourceNode) return;
const sourcePort = sourceNode.outputs.find((d) => d.id === edgeToRemove.sourcePortId);
Expand All @@ -261,18 +268,24 @@ export const removeNode = (wf: Workflow, id: string) => {
removeEdge(wf, edgeId);
});

// Tombstone
const node = wf.nodes.find((n) => n.id === id);
if (node) {
node.isDeleted = true;
}

// Remove the node
wf.nodes = wf.nodes.filter((node) => node.id !== id);
// wf.nodes = wf.nodes.filter((node) => node.id !== id);
};

export const updateNodeState = (wf: Workflow, nodeId: string, state: any) => {
const node = wf.nodes.find((d) => d.id === nodeId);
const node = wf.nodes.find((d) => d.id === nodeId && d.isDeleted !== true);
if (!node) return;
node.state = state;
};

export const updateNodeStatus = (wf: Workflow, nodeId: string, status: OperatorStatus) => {
const node = wf.nodes.find((d) => d.id === nodeId);
const node = wf.nodes.find((d) => d.id === nodeId && d.isDeleted !== true);
if (!node) return;
node.status = status;
};
Expand Down Expand Up @@ -452,7 +465,7 @@ export function selectOutput(
operator.active = selected.id;

// If this output is connected to input port(s), update the input port(s)
const hasOutgoingEdges = wf.edges.some((edge) => edge.source === operator.id);
const hasOutgoingEdges = wf.edges.some((edge) => edge.source === operator.id && edge.isDeleted !== true);
if (!hasOutgoingEdges) return;

selected.status = WorkflowPortStatus.CONNECTED;
Expand All @@ -462,6 +475,7 @@ export function selectOutput(
nodeCache.set(operator.id, []);

wf.edges.forEach((edge) => {
if (edge.isDeleted === true) return;
// Update the input port of the direct target node
if (edge.source === operator.id) {
const targetNode = wf.nodes.find((node) => node.id === edge.target);
Expand Down Expand Up @@ -568,7 +582,7 @@ export const isWorkflowNodeDirty = (node: WorkflowNode<any>): boolean => {
* */
export const branchWorkflow = (wf: Workflow, nodeId: string) => {
// 1. Find anchor point
const anchor = wf.nodes.find((n) => n.id === nodeId);
const anchor = wf.nodes.find((n) => n.id === nodeId && n.isDeleted !== true);
if (!anchor) return;

// 2. Collect the subgraph that we want to copy
Expand All @@ -580,12 +594,12 @@ export const branchWorkflow = (wf: Workflow, nodeId: string) => {
// basically depth-first-search
while (stack.length > 0) {
const id = stack.pop();
const node = wf.nodes.find((n) => n.id === id);
const node = wf.nodes.find((n) => n.id === id && n.isDeleted !== true);
if (node) copyNodes.push(_.cloneDeep(node));
processed.add(id as string);

// Grab downstream edges
const edges = wf.edges.filter((e) => e.source === id);
const edges = wf.edges.filter((e) => e.source === id && e.isDeleted !== true);
edges.forEach((edge) => {
const newId = edge.target as string;
if (!processed.has(newId)) {
Expand All @@ -596,7 +610,7 @@ export const branchWorkflow = (wf: Workflow, nodeId: string) => {
}

// 3. Collect the upstream edges of the anchor
const upstreamEdges = wf.edges.filter((edge) => edge.target === anchor.id);
const upstreamEdges = wf.edges.filter((edge) => edge.target === anchor.id && edge.isDeleted !== true);
upstreamEdges.forEach((edge) => {
copyEdges.push(_.cloneDeep(edge));
});
Expand Down
9 changes: 7 additions & 2 deletions packages/client/hmi-client/src/types/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ export interface BaseState {
export interface WorkflowNode<S> {
// Information
id: string;
displayName: string;
workflowId: string;
isDeleted?: boolean;
version?: number;

displayName: string;
operationType: string;
documentationUrl?: string;
imageUrl?: string;
Expand All @@ -124,8 +127,10 @@ export interface WorkflowNode<S> {
export interface WorkflowEdge {
id: string;
workflowId: string;
points: Position[];
isDeleted?: boolean;
version?: number;

points: Position[];
source?: WorkflowNode<any>['id'];
sourcePortId?: string;

Expand Down
53 changes: 31 additions & 22 deletions packages/client/hmi-client/tests/unit/services/workflow.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ const workflow: Workflow = {
edges: []
};

// eslint-disable-next-line
const _edges = (wf: Workflow) => {
return wf.edges.filter((d) => d.isDeleted !== true);
};
// eslint-disable-next-line
const _nodes = (wf: Workflow) => {
return wf.nodes.filter((d) => d.isDeleted !== true);
};

const operationLib = new Map<string, Operation>();
operationLib.set('add', addOperation);

Expand Down Expand Up @@ -183,41 +192,41 @@ describe('workflow copying branch -< fork', () => {
workflowService.addEdge(wf, n3.id, 'n3o', n5.id, 'n5i', []);

it('bootstrapped workflow programmatically', () => {
expect(wf.nodes.length).to.eq(5);
expect(wf.edges.length).to.eq(4);
expect(_nodes(wf).length).to.eq(5);
expect(_edges(wf).length).to.eq(4);
expect(sanityCheck(wf)).to.eq(true);
});

it('duplicate linear flow', () => {
const testWf = _.cloneDeep(wf);
workflowService.branchWorkflow(testWf, n1.id);
expect(testWf.nodes.length).to.eq(10);
expect(testWf.edges.length).to.eq(8);
expect(_nodes(testWf).length).to.eq(10);
expect(_edges(testWf).length).to.eq(8);
expect(sanityCheck(testWf)).to.eq(true);
});

it('duplicate tail operator', () => {
const testWf = _.cloneDeep(wf);
workflowService.branchWorkflow(testWf, n5.id);
expect(testWf.nodes.length).to.eq(6);
expect(testWf.edges.length).to.eq(5);
expect(_nodes(testWf).length).to.eq(6);
expect(_edges(testWf).length).to.eq(5);
expect(sanityCheck(testWf)).to.eq(true);
});

it('duplicate at fork', () => {
const testWf = _.cloneDeep(wf);
workflowService.branchWorkflow(testWf, n3.id);
expect(testWf.nodes.length).to.eq(8);
expect(testWf.edges.length).to.eq(7);
expect(_nodes(testWf).length).to.eq(8);
expect(_edges(testWf).length).to.eq(7);
expect(testWf.edges.filter((edge) => edge.source === n2.id).length).to.eq(2);
expect(sanityCheck(testWf)).to.eq(true);
});

it('bad duplication', () => {
const testWf = _.cloneDeep(wf);
workflowService.branchWorkflow(testWf, 'does not exist');
expect(testWf.nodes.length).to.eq(5);
expect(testWf.edges.length).to.eq(4);
expect(_nodes(testWf).length).to.eq(5);
expect(_edges(testWf).length).to.eq(4);
});
});

Expand Down Expand Up @@ -270,18 +279,18 @@ describe('workflow copying branch >- fork', () => {
workflowService.addEdge(wf, n3.id, 'n3o', n4.id, 'n4i', []);

it('bootstrapped workflow programmatically', () => {
expect(wf.nodes.length).to.eq(4);
expect(wf.edges.length).to.eq(3);
expect(_nodes(wf).length).to.eq(4);
expect(_edges(wf).length).to.eq(3);
expect(sanityCheck(wf)).to.eq(true);
});

it('duplicate at fork', () => {
const testWf = _.cloneDeep(wf);
workflowService.branchWorkflow(testWf, n3.id);
expect(testWf.nodes.length).to.eq(6);
expect(testWf.edges.length).to.eq(6);
expect(testWf.edges.filter((edge) => edge.source === n1.id).length).to.eq(2);
expect(testWf.edges.filter((edge) => edge.source === n2.id).length).to.eq(2);
expect(_nodes(testWf).length).to.eq(6);
expect(_edges(testWf).length).to.eq(6);
expect(_edges(testWf).filter((edge) => edge.source === n1.id).length).to.eq(2);
expect(_edges(testWf).filter((edge) => edge.source === n2.id).length).to.eq(2);
expect(sanityCheck(wf)).to.eq(true);
});
});
Expand Down Expand Up @@ -349,8 +358,8 @@ describe('workflow operator with multiple output types', () => {
);

expect(datasetNode.inputs[0].value).toMatchObject(['dataset xyz']);
expect(wf.edges.length).eq(1);
workflowService.removeEdge(wf, wf.edges[0].id);
expect(_edges(wf).length).eq(1);
workflowService.removeEdge(wf, _edges(wf)[0].id);
});

it('dataset|model => model', () => {
Expand All @@ -364,8 +373,8 @@ describe('workflow operator with multiple output types', () => {
);

expect(modelNode.inputs[0].value).toMatchObject(['model abc']);
expect(wf.edges.length).eq(1);
workflowService.removeEdge(wf, wf.edges[0].id);
expect(_edges(wf).length).eq(1);
workflowService.removeEdge(wf, _edges(wf)[0].id);
});

it('dataset|model => test', () => {
Expand All @@ -379,7 +388,7 @@ describe('workflow operator with multiple output types', () => {
);

expect(testNode.inputs[0].value).toBeNull();
expect(wf.edges.length).eq(0);
expect(_edges(wf).length).eq(0);
});

it('edge case many to many', () => {
Expand All @@ -392,6 +401,6 @@ describe('workflow operator with multiple output types', () => {
[]
);
expect(edgeCaseNode.inputs[0].value).toBeNull();
expect(wf.edges.length).eq(0);
expect(_edges(wf).length).eq(0);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,17 @@ public ResponseEntity<Workflow> getWorkflow(
}
)
public ResponseEntity<Workflow> createWorkflow(
@RequestBody final Workflow item,
@RequestBody final Workflow workflow,
@RequestParam(name = "project-id", required = false) final UUID projectId
) {
final Schema.Permission permission = projectService.checkPermissionCanWrite(
currentUserService.get().getId(),
projectId
);
try {
return ResponseEntity.status(HttpStatus.CREATED).body(workflowService.createAsset(item, projectId, permission));
return ResponseEntity.status(HttpStatus.CREATED).body(
workflowService.createAsset(workflow, projectId, permission)
);
} catch (final IOException e) {
final String error = "Unable to create workflow";
log.error(error, e);
Expand Down Expand Up @@ -181,6 +183,7 @@ public ResponseEntity<Workflow> updateWorkflow(
);
try {
workflow.setId(id);

final Optional<Workflow> updated = workflowService.updateAsset(workflow, projectId, permission);
return updated.map(ResponseEntity::ok).orElseGet(() -> ResponseEntity.notFound().build());
} catch (final IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package software.uncharted.terarium.hmiserver.models.dataservice.workflow;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.Serializable;
import java.util.List;
import java.util.UUID;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import software.uncharted.terarium.hmiserver.annotations.TSIgnore;
import software.uncharted.terarium.hmiserver.annotations.TSModel;
import software.uncharted.terarium.hmiserver.annotations.TSOptional;
import software.uncharted.terarium.hmiserver.models.SupportAdditionalProperties;

@Data
Expand All @@ -14,15 +19,28 @@ public class WorkflowEdge extends SupportAdditionalProperties implements Seriali

private UUID id;
private UUID workflowId;

@TSIgnore
private Long version;

private Boolean isDeleted;

private UUID source;
private UUID sourcePortId;

private UUID target;
private UUID targetPortId;

private List<JsonNode> points;

public WorkflowEdge clone(final UUID workflowId, final UUID source, final UUID target) {
final WorkflowEdge clone = (WorkflowEdge) super.clone();
clone.setId(UUID.randomUUID());
clone.setWorkflowId(workflowId);
clone.setSource(source);
clone.setSourcePortId(sourcePortId);
clone.setTarget(target);
clone.setTargetPortId(targetPortId);
return clone;
}
}
Loading

0 comments on commit 2086dd0

Please sign in to comment.