Skip to content

Resuming after NodeInterrupt using subgraphs seems buggyΒ #1552

@baptistejamin

Description

@baptistejamin

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangGraph.js documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangGraph.js rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangGraph (or the specific integration package).

Example Code

A working implementation:

  • We wait for a "hello" event using NodeInterrupt
  • Routes to two other nodes (listening) "demo" or "pricing"
  • Follow-up execution resumes
import { StateGraph, Annotation, START, END, BaseCheckpointSaver } from "@langchain/langgraph";
import { MemorySaver, NodeInterrupt } from "@langchain/langgraph";
// Define state with annotations
export const ParallelTestState = Annotation.Root({
  event: Annotation<string>,
  supportOffline: Annotation<boolean | undefined>
});

/**************************************************************************
 * NODES
 ***************************************************************************/

// Hello event node
const helloEventNode = async (state: typeof ParallelTestState.State) => {
  console.log("hello event?");

  if (state.event !== "hello") {
    throw new NodeInterrupt("hello_event");
  }

  console.log("hello event OK");

  return state;
};

// Follow-up block after hello check
const helloFollowUpNode = async (state: typeof ParallelTestState.State) => {
  console.log("βœ… just said hello");

  return state;
};

// Support offline check and mute block
const supportOfflineCheckNode = async (state: typeof ParallelTestState.State) => {
  console.log("πŸ” Checking if support is offline...");

  // Simulate checking support status
  const isSupportOffline = true;

  // Add support status to state for conditional routing
  return {
    ...state,
    supportOffline: isSupportOffline
  };
};

// Demo event node
const demoEventNode = async (state: typeof ParallelTestState.State) => {
  console.log("demo event?");

  if (state.event !== "demo") {
    throw new NodeInterrupt("demo_event");
  }

  console.log("demo event OK");

  return state;
};

// Demo follow-up block
const demoFollowUpNode = async (state: typeof ParallelTestState.State) => {
  console.log("βœ… just said demo");

  return state;
};

// Pricing event node
const pricingEventNode = async (state: typeof ParallelTestState.State) => {
  console.log("pricing event?");

  if (state.event !== "pricing") {
    throw new NodeInterrupt("pricing_event");
  }

  console.log("pricing event OK");

  return state;
};

// Pricing follow-up block
const pricingFollowUpNode = async (state: typeof ParallelTestState.State) => {
  console.log("βœ… just said pricing");

  return state;
};

/**************************************************************************
 * GRAPH CONSTRUCTION
 ***************************************************************************/

const createGraph = (checkpointer: BaseCheckpointSaver<number>) => {
  const workflow = new StateGraph(ParallelTestState);

  // Add nodes
  workflow.addNode("hello_event", helloEventNode);
  workflow.addNode("hello_followup", helloFollowUpNode);
  workflow.addNode("support_offline_check", supportOfflineCheckNode);
  workflow.addNode("demo_event", demoEventNode);
  workflow.addNode("demo_followup", demoFollowUpNode);
  workflow.addNode("pricing_event", pricingEventNode);
  workflow.addNode("pricing_followup", pricingFollowUpNode);

  // Set entry point using START
  workflow.addEdge(START, "hello_event");

  // Add edges for sequential flow
  workflow.addEdge("hello_event", "hello_followup");
  workflow.addEdge("hello_followup", "support_offline_check");

  // Route through the router node
  workflow.addConditionalEdges("support_offline_check", (state) => {
    if (state.supportOffline === false) {
      return END;
    }

    return ["demo_event", "pricing_event"];
  }, [END, "demo_event", "pricing_event"]);

  // Add edges for each branch
  workflow.addEdge("demo_event", "demo_followup");
  workflow.addEdge("pricing_event", "pricing_followup");

  // Both branches converge to END
  workflow.addEdge("demo_followup", END);
  workflow.addEdge("pricing_followup", END);

  return workflow.compile({ checkpointer });
};

/**************************************************************************
 * MAIN EXECUTION
 ***************************************************************************/

const main = async () => {
  try {
    // Initialize checkpointer
    const checkpointer = new MemorySaver();

    // Create and compile graph
    const graph = createGraph(checkpointer);

    // Set event to "hello" as requested
    const initialState = {
      event: "hello"
    } as typeof ParallelTestState.State;

    const threadId = "parallel-test-thread" + Math.random().toString(36).substring(2, 15);

    const config = {
      configurable: { thread_id: threadId }
    };

    // Execute the graph
    const result = await graph.invoke(initialState, config);

    console.log("πŸ”„ First run completed", result);

    await graph.updateState(config, { event: "demo" });

    const final = await graph.invoke(null, config);

    console.log("βœ… Graph execution completed successfully!");
    console.log("πŸ“Š Final state:", final);
  } catch (error) {
    console.error("❌ Graph execution failed:", error);
  }
};

// Run if this file is executed directly
if (import.meta.url === `file://${process.argv[1]}`) {
  main();
}

export { createGraph, main };

Same code, but we include the main logic in a subgraph. Graph is resumed since the beginning.

/**************************************************************************
 * IMPORTS
 ***************************************************************************/

// NPM
import { StateGraph, Annotation, START, END, BaseCheckpointSaver } from "@langchain/langgraph";
import { MemorySaver, NodeInterrupt } from "@langchain/langgraph";

/**************************************************************************
 * TYPES
 ***************************************************************************/

// Define state with annotations
export const ParallelTestState = Annotation.Root({
  event: Annotation<string>,
  supportOffline: Annotation<boolean | undefined>
});

/**************************************************************************
 * NODES
 ***************************************************************************/

// Hello event node
const helloEventNode = async (state: typeof ParallelTestState.State) => {
  console.log("hello event?");

  if (state.event !== "hello") {
    throw new NodeInterrupt("hello_event");
  }

  console.log("hello event OK");

  return state;
};

// Follow-up block after hello check
const helloFollowUpNode = async (state: typeof ParallelTestState.State) => {
  console.log("βœ… just said hello");

  return state;
};

// Support offline check and mute block
const supportOfflineCheckNode = async (state: typeof ParallelTestState.State) => {
  console.log("πŸ” Checking if support is offline...");

  // Simulate checking support status
  const isSupportOffline = true;

  // Add support status to state for conditional routing
  return {
    ...state,
    supportOffline: isSupportOffline
  };
};

// Demo event node
const demoEventNode = async (state: typeof ParallelTestState.State) => {
  console.log("demo event?");

  if (state.event !== "demo") {
    throw new NodeInterrupt("demo_event");
  }

  console.log("demo event OK");

  return state;
};

// Demo follow-up block
const demoFollowUpNode = async (state: typeof ParallelTestState.State) => {
  console.log("βœ… just said demo");

  return state;
};

// Pricing event node
const pricingEventNode = async (state: typeof ParallelTestState.State) => {
  console.log("pricing event?");

  if (state.event !== "pricing") {
    throw new NodeInterrupt("pricing_event");
  }

  console.log("pricing event OK");

  return state;
};

// Pricing follow-up block
const pricingFollowUpNode = async (state: typeof ParallelTestState.State) => {
  console.log("βœ… just said pricing");

  return state;
};

/**************************************************************************
 * GRAPH CONSTRUCTION
 ***************************************************************************/

const createGraph = (checkpointer: BaseCheckpointSaver<number>) => {
  // Create main workflow
  const mainWorkflow = new StateGraph(ParallelTestState);

  // Create subscenario workflow
  const subscenarioWorkflow = new StateGraph(ParallelTestState);

  // Add nodes to subscenario
  subscenarioWorkflow.addNode("hello_event", helloEventNode);
  subscenarioWorkflow.addNode("hello_followup", helloFollowUpNode);
  subscenarioWorkflow.addNode("support_offline_check", supportOfflineCheckNode);
  subscenarioWorkflow.addNode("demo_event", demoEventNode);
  subscenarioWorkflow.addNode("demo_followup", demoFollowUpNode);
  subscenarioWorkflow.addNode("pricing_event", pricingEventNode);
  subscenarioWorkflow.addNode("pricing_followup", pricingFollowUpNode);

  // Set entry point using START
  subscenarioWorkflow.addEdge(START, "hello_event");

  // Add edges for sequential flow
  subscenarioWorkflow.addEdge("hello_event", "hello_followup");
  subscenarioWorkflow.addEdge("hello_followup", "support_offline_check");

  // Route through the router node
  subscenarioWorkflow.addConditionalEdges("support_offline_check", (state) => {
    if (state.supportOffline === false) {
      return END;
    }

    return ["demo_event", "pricing_event"];
  }, [END, "demo_event", "pricing_event"]);

  // Add edges for each branch
  subscenarioWorkflow.addEdge("demo_event", "demo_followup");
  subscenarioWorkflow.addEdge("pricing_event", "pricing_followup");

  // Both branches converge to END
  subscenarioWorkflow.addEdge("demo_followup", END);
  subscenarioWorkflow.addEdge("pricing_followup", END);

  // Compile the subscenario
  const compiledSubscenario = subscenarioWorkflow.compile();

  // Add subscenario as a node to main workflow
  mainWorkflow.addNode("parallel_test_subscenario", compiledSubscenario);

  // Set entry point to the subscenario
  mainWorkflow.addEdge(START, "parallel_test_subscenario");

  // Connect subscenario to END
  mainWorkflow.addEdge("parallel_test_subscenario", END);

  return mainWorkflow.compile({ checkpointer });
};

/**************************************************************************
 * MAIN EXECUTION
 ***************************************************************************/

const main = async () => {
  try {
    // Initialize checkpointer
    const checkpointer = new MemorySaver();

    // Create and compile graph
    const graph = createGraph(checkpointer);

    // Set event to "hello" as requested
    const initialState = {
      event: "hello"
    } as typeof ParallelTestState.State;

    const threadId = "parallel-test-thread" + Math.random().toString(36).substring(2, 15);

    const config = {
      configurable: { thread_id: threadId }
    };

    // Execute the graph
    const initialStream = await graph.stream(initialState, {
      ...config, subgraphs: true
    });

    for await (const chunk of initialStream) {
      console.log("πŸ”„ Initial stream chunk:", chunk);
    }

    // Disabling updateState resume the graph from the last checkpoin
    // Enabling it will recreate from the start

    const outerGraphState = await graph.getState(config, { subgraphs: true });

    console.log("πŸ”„ Outer graph state:", outerGraphState);

    // Working using this.
    //await graph.updateState(outerGraphState.tasks[0].state.config, { event: "demo" });

    // Not working using this. It resumes at the start of the graph. If commented it runs fine, but there is not way to update state.
    await graph.updateState(config, { event: "demo" });

    const stream = await graph.stream(null, {
      ...config, subgraphs: true
    });

    for await (const chunk of stream) {
      console.log("πŸ”„ Stream chunk:", chunk);
    }

    console.log("βœ… Graph execution completed successfully!");
    //console.log("πŸ“Š Final state:", final);
  } catch (error) {
    console.error("❌ Graph execution failed:", error);
  }
};

// Run if this file is executed directly
if (import.meta.url === `file://${process.argv[1]}`) {
  main();
}

export { createGraph, main };

Error Message and Stack Trace (if applicable)

No response

Description

  • We are trying to use LangGraph to replace our previous execution graph engine at Crisp, for customer support.
  • Users can have different graphs (including subgraphs), listening to events to followup exectutions
  • When using a main graph with parallel branches, and NodeInterrupts it works fine
  • As soon as the scenario is encapsulated in a subgraph, it resumes since the beggining after being resumed.
  • await graph.updateState(outerGraphState.tasks[0].state.config, { event: "demo" }); seems to do the job, however, next block is not triggered

System Info

  • Node v22.18.0
  • Langgraph v0.4.6

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions