A workflow agent doesn't execute linearly from step 1 to step N. It reasons, acts, evaluates results, asks clarifying questions, and sometimes needs to be nudged back on track. Modeling this as a directed graph — where each node is a specific behavior and edges are conditional — turns an unpredictable LLM loop into a debuggable, observable state machine.
The Five Nodes
START
│
▼
┌─────────────────────────────────────────┐
│ reason │
│ LLM decides: call a tool? respond? │
│ need user input? workflow is done? │
└──┬──────────────┬────────────────┬──────┘
│ │ │
▼ ▼ ▼
call_tool request_input nudge
│ │ │
▼ ▼ └──▶ reason
finish ◀─── reason (next turn)
│
▼
END
Each node has one responsibility. No node performs two kinds of work. The routing logic between nodes is explicit — not buried in conditional branches inside a single function.
The reason Node
The reason node is the only place where the language model runs. It receives the full conversation history, the system prompt, and the available tool definitions, and produces either a tool call or a text response.
async def reason(state: GoalAgentState) -> dict:
# Safety rails first
if state["iteration_count"] >= state["max_iterations"]:
return {"error": "MAX_ITERATIONS", "is_complete": True}
token_budget = max(100_000, state["max_iterations"] * 10_000)
if total_input_tokens_so_far > token_budget:
return {"error": "TOKEN_BUDGET_EXCEEDED", "is_complete": True}
# Determine tool mode
unique_tools_used = count_unique_tools(state["messages"])
threshold = max(2, min(state["min_tools_for_auto"], len(available_tools)))
tool_choice = "any" if unique_tools_used < threshold else None
# "any" = LLM must call a tool (forced mode)
# None = LLM can respond with text (auto mode)
response = await llm.invoke(
messages=build_messages(state),
tools=available_tools,
tool_choice=tool_choice,
)
return {"messages": [response], "iteration_count": state["iteration_count"] + 1}
The forced/auto mode distinction is important. In forced mode, the LLM cannot respond with "What would you like me to do?" — it must call a tool. This ensures the agent actually processes the user's file before asking clarifying questions. The threshold is configurable per workflow via min_tools_for_auto in the workflow spec.
The call_tool Node
The call_tool node executes the tool calls the LLM decided to make. It handles three categories of tools differently:
async def call_tool(state: GoalAgentState) -> dict:
tool_calls = state["messages"][-1].tool_calls
results = []
for call in tool_calls:
if call.name == "request_user_input":
# Trigger human-in-the-loop interrupt
# (handled by separate request_input node)
continue
if call.name == "store_artifact":
# Upload content to object storage
result = await upload_to_storage(call.args)
results.append(ToolMessage(result))
continue
if call.name == "complete_workflow":
# Signal workflow completion — always runs last
state["is_complete"] = True
continue
# Data tools: invoke MCP gateway
if is_duplicate_call(call, state["messages"]):
results.append(ToolMessage("[skipped: duplicate call]"))
continue
result = await mcp_client.invoke(call.name, call.args)
results.append(ToolMessage(result))
return {"messages": results}
Duplicate call detection prevents loops where the LLM re-calls the same tool with identical arguments. This happens occasionally when the LLM's context window doesn't retain that a tool already produced results — deduplication makes the agent recovery from this gracefully.
The complete_workflow reordering guardrail ensures it always executes last, even if the LLM includes it in the middle of a batch of tool calls. Finishing before storing artifacts would produce an empty result.
The request_input Node — Human in the Loop
Some workflows require user decisions mid-execution. The request_input node pauses graph execution, persists the current state to a checkpoint store, and sends a SLOT_NEEDED event to the client.
async def request_input(state: GoalAgentState) -> dict:
slot_request = extract_slot_request(state["messages"][-1])
# Persist state so execution can resume later
await checkpointer.save(state)
# Emit interrupt event to client via SSE
await sse_service.publish(state["job_id"], SlotNeededEvent(
slot_id=slot_request["slot_id"],
slot_type=slot_request["slot_type"],
prompt=slot_request["prompt"],
options=slot_request.get("options", []),
))
# Interrupt graph execution here
raise GraphInterrupt(pending_slot=slot_request)
When the user submits their response, the graph resumes from the checkpoint with the filled slot value injected into the state. The LLM sees the user's answer as a normal message in the conversation history and continues the workflow.
The resume_to_job_ready workflow uses this for mandatory category selection: the agent analyzes the resume, generates 3 career categories with role recommendations, then pauses for the user to choose — before proceeding to generate tailored resumes.
The nudge Node — Anti-Premature-Finish Guardrail
Sometimes the LLM responds with text instead of calling tools, before the workflow is actually complete. This is a false finish — the agent has "answered" the question without producing the required artifact.
def nudge(state: GoalAgentState) -> dict:
if state["nudge_count"] >= MAX_NUDGE_ATTEMPTS: # 3
# Give up nudging — let reason decide in auto mode
return {"nudge_count": state["nudge_count"] + 1}
reminder = HumanMessage(
"You responded with text, but the workflow is not yet complete. "
"You must continue calling tools and store an artifact before finishing."
)
return {
"messages": [reminder],
"nudge_count": state["nudge_count"] + 1,
}
The nudge cap (3 attempts) prevents infinite loops. After 3 nudges, the routing function allows the reason node to use auto mode regardless of how many tools have been called — the LLM is allowed to finish even if something went wrong.
Routing Functions
Conditional routing between nodes is encoded in pure functions with explicit outputs:
def route_after_reason(state: GoalAgentState) -> str:
last_msg = state["messages"][-1]
if state.get("is_complete"):
return "finish"
if has_tool_calls(last_msg):
if requests_user_input(last_msg):
return "request_input"
return "call_tool"
# LLM responded with text (no tool calls)
if needs_nudge(state):
return "nudge"
return "finish" # auto mode response = workflow complete
def route_after_call_tool(state: GoalAgentState) -> str:
if state.get("is_complete"):
return "finish"
if state.get("pending_slot_request"):
return "request_input"
return "reason" # continue loop
Routing functions read only from state. They have no side effects. This makes them trivially testable: given any state dict, assert which node fires next.
State Reducers
The graph state uses three reducer strategies, assigned per field:
GoalAgentState:
messages → APPEND (add_messages reducer, deduplicates by ID)
filled_slots → MERGE (shallow dict merge)
token_usage → MERGE + accumulate (per-turn records + running total)
iteration_count → OVERWRITE (latest value wins)
is_complete → OVERWRITE
error → OVERWRITE
The APPEND reducer for messages is critical for conversation history. It merges new messages into the list while deduplicating by message ID — preventing the same tool result from appearing twice if a checkpoint is replayed.
The value of modeling execution as a graph isn't the graph itself — it's that every behavior lives in exactly one node, every transition lives in exactly one routing function, and nothing is hidden inside a long switch statement.
