Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions dag_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""
Metaflow DAG Parser
-------------------
Parses a Metaflow flow file using the AST module (zero dependencies)
and outputs the DAG topology as JSON:

{
"steps": ["start", "process", "end"],
"edges": [["start", "process"], ["process", "end"]],
"error": null
}

Usage:
python dag_parser.py <path_to_flow.py>
"""

import ast
import json
import sys


def parse_next_args(call_node):
"""Extract step names from a self.next(...) call node."""
targets = []
for arg in call_node.args:
# self.next(self.some_step) -> Attribute node
if isinstance(arg, ast.Attribute):
targets.append(arg.attr)
return targets


def parse_flow(filepath):
try:
with open(filepath, "r", encoding="utf-8") as f:
source = f.read()
except Exception as e:
return {"steps": [], "edges": [], "error": str(e)}

try:
tree = ast.parse(source, filename=filepath)
except SyntaxError as e:
return {"steps": [], "edges": [], "error": f"SyntaxError: {e}"}

steps = []
edges = []

for node in ast.walk(tree):
# Find classes that inherit from FlowSpec
if not isinstance(node, ast.ClassDef):
continue

is_flow = any(
(isinstance(b, ast.Name) and b.id == "FlowSpec")
or (isinstance(b, ast.Attribute) and b.attr == "FlowSpec")
for b in node.bases
)
if not is_flow:
continue

# Collect all @step methods
step_methods = set()
for item in node.body:
if not isinstance(item, ast.FunctionDef):
continue
has_step_decorator = any(
(isinstance(d, ast.Name) and d.id == "step")
or (isinstance(d, ast.Attribute) and d.attr == "step")
for d in item.decorator_list
)
if has_step_decorator:
step_methods.add(item.name)

steps = list(step_methods)

# Extract edges from self.next(...) calls inside each step method
for item in node.body:
if not isinstance(item, ast.FunctionDef):
continue
if item.name not in step_methods:
continue

for subnode in ast.walk(item):
if not isinstance(subnode, ast.Call):
continue
func = subnode.func
# Match self.next(...)
if (
isinstance(func, ast.Attribute)
and func.attr == "next"
and isinstance(func.value, ast.Name)
and func.value.id == "self"
):
for target in parse_next_args(subnode):
edges.append([item.name, target])

# Only process the first FlowSpec subclass found
break

# Topological sort: BFS from "start" for display ordering
ordered = []
if "start" in steps:
visited = set()
queue = ["start"]
adj = {}
for s in steps:
adj[s] = []
for frm, to in edges:
if frm in adj:
adj[frm].append(to)

while queue:
node_name = queue.pop(0)
if node_name in visited:
continue
visited.add(node_name)
ordered.append(node_name)
for neighbor in adj.get(node_name, []):
if neighbor not in visited:
queue.append(neighbor)

# Add any steps not reachable from start
for s in steps:
if s not in visited:
ordered.append(s)
steps = ordered

return {"steps": steps, "edges": edges, "error": None}


if __name__ == "__main__":
if len(sys.argv) < 2:
print(json.dumps({"steps": [], "edges": [], "error": "No file path provided"}))
sys.exit(1)

result = parse_flow(sys.argv[1])
print(json.dumps(result))
158 changes: 144 additions & 14 deletions extension.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const vscode = require('vscode');
const path = require('path');
const cp = require('child_process');
const fs = require('fs');

let sharedTerminal = null;

Expand Down Expand Up @@ -27,18 +29,18 @@ async function runPythonCommand(scriptName) {
}

if (!funcName) {
vscode.window.showErrorMessage("No enclosing Python function found.");
vscode.window.showErrorMessage('No enclosing Python function found.');
return;
}

const filePath = doc.fileName;
const fileDir = path.dirname(filePath);

let command = '';
if (scriptName == 'spin_func')
command = `python ${filePath} spin ${funcName}`;
if (scriptName === 'spin_func')
command = `python "${filePath}" spin ${funcName}`;
else
command = `python ${filePath} run`;
command = `python "${filePath}" run`;

// Reuse or create a single shared terminal
if (!sharedTerminal || sharedTerminal.exitStatus !== undefined) {
Expand All @@ -48,14 +50,139 @@ async function runPythonCommand(scriptName) {
sharedTerminal.show();
sharedTerminal.sendText(`cd "${fileDir}"`);
sharedTerminal.sendText(command);
}

/*
vscode.window.showInformationMessage(
`${scriptName.toUpperCase()}: ${funcName} from ${path.basename(filePath)}`
);
*/
// ---------------------------------------------------------------------------
// DAG WebView
// ---------------------------------------------------------------------------

let dagPanel = null;
let dagWatcher = null;

/**
* Runs dag_parser.py for the given file and returns { steps, edges, error }.
*/
function parseDag(filePath, extensionPath) {
return new Promise((resolve) => {
const parserScript = path.join(extensionPath, 'dag_parser.py');
const proc = cp.spawn('python', [parserScript, filePath]);

let stdout = '';
let stderr = '';

proc.stdout.on('data', (d) => { stdout += d.toString(); });
proc.stderr.on('data', (d) => { stderr += d.toString(); });

proc.on('close', () => {
try {
resolve(JSON.parse(stdout));
} catch (_) {
resolve({ steps: [], edges: [], error: stderr || 'Failed to parse DAG output.' });
}
});

proc.on('error', (err) => {
resolve({ steps: [], edges: [], error: `Could not start Python: ${err.message}` });
});
});
}

/**
* Loads dag_webview.html from the media folder.
*/
function getWebviewContent(extensionUri) {
const htmlPath = path.join(extensionUri.fsPath, 'media', 'dag_webview.html');
return fs.readFileSync(htmlPath, 'utf8');
}

/**
* Navigates the active editor cursor to the def line of the given step.
*/
async function navigateToStep(stepName) {
const editor = vscode.window.activeTextEditor;
if (!editor) return;

const lines = editor.document.getText().split('\n');
const re = new RegExp(`^\\s*(?:async\\s+)?def\\s+${stepName}\\s*\\(`);

for (let i = 0; i < lines.length; i++) {
if (re.test(lines[i])) {
const pos = new vscode.Position(i, 0);
editor.selection = new vscode.Selection(pos, pos);
editor.revealRange(new vscode.Range(pos, pos), vscode.TextEditorRevealType.InCenter);
return;
}
}
vscode.window.showWarningMessage(`Step "${stepName}" not found in the current file.`);
}

/**
* Command: Open (or refresh) the Metaflow DAG WebView panel.
*/
async function showDagView(context) {
const editor = vscode.window.activeTextEditor;
if (!editor || editor.document.languageId !== 'python') {
vscode.window.showErrorMessage('Please open a Metaflow Python flow file first.');
return;
}

const filePath = editor.document.fileName;

// Create the panel if it doesn't exist, otherwise reveal it
if (!dagPanel) {
dagPanel = vscode.window.createWebviewPanel(
'metaflowDag',
'Metaflow DAG',
vscode.ViewColumn.Beside,
{
enableScripts: true,
localResourceRoots: [vscode.Uri.joinPath(context.extensionUri, 'media')],
retainContextWhenHidden: true,
}
);

dagPanel.webview.html = getWebviewContent(context.extensionUri);

// Node click → navigate to step in editor
dagPanel.webview.onDidReceiveMessage(
(msg) => {
if (msg.command === 'navigateToStep') {
navigateToStep(msg.step);
}
},
undefined,
context.subscriptions
);

dagPanel.onDidDispose(() => {
dagPanel = null;
if (dagWatcher) { dagWatcher.dispose(); dagWatcher = null; }
}, null, context.subscriptions);
} else {
dagPanel.reveal(vscode.ViewColumn.Beside);
}

// Initial render
const dagData = await parseDag(filePath, context.extensionPath);
dagData.fileName = path.basename(filePath);
dagPanel.webview.postMessage({ command: 'renderDag', data: dagData });

// Live-refresh on every save of this file
if (dagWatcher) { dagWatcher.dispose(); }
dagWatcher = vscode.workspace.onDidSaveTextDocument(async (savedDoc) => {
if (savedDoc.fileName === filePath && dagPanel) {
const refreshed = await parseDag(savedDoc.fileName, context.extensionPath);
refreshed.fileName = path.basename(savedDoc.fileName);
dagPanel.webview.postMessage({ command: 'renderDag', data: refreshed });
}
});
context.subscriptions.push(dagWatcher);
}

// ---------------------------------------------------------------------------
// Extension lifecycle
// ---------------------------------------------------------------------------

function activate(context) {
const runCmd = vscode.commands.registerCommand(
'extension.runPythonFunction',
Expand All @@ -67,14 +194,17 @@ function activate(context) {
() => runPythonCommand('spin_func')
);

context.subscriptions.push(runCmd, spinCmd);
const dagCmd = vscode.commands.registerCommand(
'extension.showDagView',
() => showDagView(context)
);

context.subscriptions.push(runCmd, spinCmd, dagCmd);
}

function deactivate() {
if (sharedTerminal) {
sharedTerminal.dispose();
sharedTerminal = null;
}
if (sharedTerminal) { sharedTerminal.dispose(); sharedTerminal = null; }
if (dagPanel) { dagPanel.dispose(); dagPanel = null; }
}

module.exports = { activate, deactivate };
Loading