Skip to content

feat: adding interruption functionality to interrupt graphs #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion Sources/LangGraph/LangGraph.swift
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,15 @@ extension StateGraph {

/// The schema representing the channels in the graph.
let schema: Channels

/// interrupt flag
var interruptFlag: Bool

/// interrupt message
var interruptMessage: String?

/// interuptionHandler
var interuptionHandler: ((_ state: AgentState) -> Void)?

/**
Initializes a new instance of `CompiledGraph`.
Expand All @@ -758,6 +767,7 @@ extension StateGraph {
self.edges = Dictionary()
self.entryPoint = owner.entryPoint!
self.finishPoint = owner.finishPoint
self.interruptFlag = false

owner.nodes.forEach { [unowned self] node in
nodes[node.id] = node.action
Expand Down Expand Up @@ -904,6 +914,16 @@ extension StateGraph {
( key, value as! AsyncThrowingStream<NodeOutput<State>, Error>)
}).first
}

private func checkInterrupt(currentState: AgentState) throws {
if self.interruptFlag {
let message: String = self.interruptMessage ?? "Interrupted"
self.interruptMessage = nil
self.interruptFlag = false
self.interuptionHandler?(currentState)
throw CompiledGraphError.executionError(message)
}
}

/**
Streams the node outputs based on the given inputs.
Expand Down Expand Up @@ -933,6 +953,7 @@ extension StateGraph {
}

try Task.checkCancellation()
try self.checkInterrupt(currentState: currentState)
let partialState = try await action(currentState)

// Support embed stream
Expand All @@ -958,6 +979,7 @@ extension StateGraph {
let output = NodeOutput(node: currentNodeId, state: currentState)

try Task.checkCancellation()
try self.checkInterrupt(currentState: currentState)
continuation.yield(output)

if(currentNodeId == finishPoint) {
Expand All @@ -976,6 +998,12 @@ extension StateGraph {

return stream
}

public func interrupt(_ message: String?) {
self.interruptFlag = true
self.interruptMessage = message
}


/**
Runs the graph and returns the final state.
Expand All @@ -986,7 +1014,8 @@ extension StateGraph {
- Throws: An error if the invocation fails.
- Returns: The final state.
*/
public func invoke(inputs: PartialAgentState, verbose: Bool = false) async throws -> State {
public func invoke(inputs: PartialAgentState, verbose: Bool = false, handler: ((_ state: AgentState) -> Void)? = nil) async throws -> State {
self.interuptionHandler = handler
let initResult: [NodeOutput<State>] = []
let result = try await stream(inputs: inputs).reduce(initResult, { partialResult, output in
[output]
Expand Down