Goal Lane workflows run asynchronously. A job submitted to a 120-iteration resume workflow will take several minutes to complete, during which the user is watching a progress indicator. The quality of that indicator — whether it moves smoothly or jumps unpredictably, whether it communicates meaningful milestones or just counts iterations — directly affects perceived reliability. This post describes the event architecture behind it.
Six Event Types
The streaming protocol defines six distinct event types, each serving a specific role in the client-server communication:
class SSEEventType(str, Enum):
PROGRESS = "progress" # Progress update (0-100), status, optional message
STATUS = "status" # Job status change (queued → executing → ...)
COMPLETED = "completed" # Workflow finished, artifacts ready
FAILED = "failed" # Terminal error, no artifacts
SLOT_NEEDED = "slot_needed" # Agent paused, user input required
KEEPALIVE = "keepalive" # Connection maintenance, no semantic content
PROGRESS events carry the 0–100 percentage, the current job status string, and an optional human-readable message describing what the agent is doing at that moment. SLOT_NEEDED is the pause event — it carries the pending slot definitions (type, prompt, options) that the client needs to render the decision UI. KEEPALIVE is sent at regular intervals during long-running computations to prevent proxies and load balancers from closing idle connections.
Wire Format and Reconnection
Each event is serialized in the standard streaming event format and includes a monotonic event ID:
def to_sse_format(self) -> str:
"""Serialize event to wire format.
Includes `id:` for client reconnection via Last-Event-ID header.
Clients that disconnect mid-stream can reconnect and resume from
where they left off — the server replays missed events.
"""
eid = self.event_id if self.event_id is not None else _next_event_id()
json_data = orjson.dumps(self.model_dump()).decode("utf-8")
return f"id: {eid}\nevent: {self.type.value}\ndata: {json_data}\n\n"
# Wire output for a progress event:
# id: 47
# event: progress
# data: {"type":"progress","job_id":"job_abc","data":{"progress":45,"status":"executing","message":"Scoring fit..."}}
#
# (blank line terminates each event)
The id: field enables reconnection. If the client loses the connection partway through a workflow (mobile network switch, tab backgrounded by the OS), it reconnects with a Last-Event-ID: 47 header. The server resumes streaming from event 48 onward. Without event IDs, a reconnecting client would see either a duplicate stream from the start or no events at all — both are broken experiences on mobile.
Milestone-Based Progress Calculation
The progress percentage shown to the user is calculated from tool milestone mappings defined in the workflow spec, not from the iteration count:
def _calculate_workflow_progress(state: dict, current_tool: str | None = None) -> int:
"""Map completed tools to progress percentages via spec milestones.
Falls back to iteration-based linear estimate when no milestones defined.
Returns percentage in range 5–95 (100% reserved for COMPLETED event).
"""
milestones = state.get("tool_progress_milestones", {})
if not milestones:
return _estimate_progress(state["iteration_count"], state["max_iterations"])
# Find the highest milestone achieved so far
max_progress = 5 # starting minimum
for msg in state.get("messages", []):
if hasattr(msg, "tool_calls"):
for tc in msg.tool_calls:
tool_name = tc.get("name", "")
if tool_name in milestones:
max_progress = max(max_progress, milestones[tool_name])
# Include current tool if it maps to a milestone
if current_tool and current_tool in milestones:
max_progress = max(max_progress, milestones[current_tool])
return min(max_progress, 95) # cap at 95 until COMPLETED
A spec with defined milestones produces a progress bar that jumps to meaningful checkpoints as tools complete:
# From a workflow spec (job_fit_score):
"tool_progress_milestones": {
"resume_profile_mcp__parse_document": 10,
"resume_profile_mcp__extract_profile": 25,
"resume_profile_mcp__score_fit": 45,
"text_analysis_mcp__extract_keywords": 65,
"store_artifact": 90,
"complete_workflow": 99
}
# Progress sequence: 5 → 10 → 25 → 45 → 65 → 90 → 99 → 100
# Each jump is meaningful: parsing done, profile extracted, fit scored...
Iteration-Based Fallback
For workflows without milestone definitions (or early in development before milestones are calibrated), progress falls back to linear interpolation over the iteration count:
def _estimate_progress(iteration: int, max_iterations: int) -> int:
"""Linear interpolation over iteration count.
Caps at 95% — 100% is reserved for the COMPLETED event.
A workflow at iteration 12 of 25 = 46% in fallback mode.
"""
if max_iterations <= 0:
return 50
progress = int((iteration / max_iterations) * 95)
return min(progress, 95)
Linear fallback is honest but imprecise. A workflow at iteration 60 of 120 is at 50% by count — but might be at 85% by actual work done if the remaining iterations are lightweight cleanup. Milestone-based progress reflects actual work; iteration-based fallback reflects elapsed effort. For user experience, the milestone approach is significantly better.
The SLOT_NEEDED Event
When the agent reaches the request_input graph node, it emits a SLOT_NEEDED event that carries everything the client needs to render a decision interface:
class SlotNeededEvent(SSEEvent):
def __init__(
self,
job_id: str,
job_spec_id: str,
pending_slots: list[dict], # slot definitions: type, prompt, options
context: str | None, # agent's summary of what it found
checkpoint_id: str | None, # resumes from here after slot is filled
step_output_preview: dict | None, # preview data (e.g., 3 career categories)
agent_message: str | None, # message to display above the decision UI
) -> None: ...
The step_output_preview field is particularly important for workflows like resume_to_job_ready: the agent analyzes the resume and generates 3 career category recommendations before pausing. The preview carries those recommendations as structured data, which the client renders as a choice UI — not raw text. The structured data allows the client to render radio buttons, category cards, or any other interactive form that the slot type definition specifies.
Dependency Inversion: The Progress Reporter Protocol
The agent doesn't depend directly on any specific event infrastructure. Instead, it depends on a protocol interface:
@runtime_checkable
class AgentProgressReporter(Protocol):
"""Protocol for agent progress reporting (dependency inversion).
The agent depends on this protocol, not on the concrete
SSE service or database implementation.
"""
async def report(
self,
job_spec_id: str,
progress: int,
status: str,
message: str | None = None,
**extra: Any,
) -> None: ...
The concrete implementation writes to both the SSE channel (for live clients) and the database (for clients that connect after the workflow completes). But the agent graph code never imports the concrete implementation — it receives a reporter object at initialization. This makes the agent testable with a mock reporter and allows the event infrastructure to evolve without touching agent logic.
A progress bar that shows 0% for 4 minutes then jumps to 100% is worse than no progress bar. The milestone system exists to give users a truthful signal at each meaningful phase boundary, not a performance of progress over a fixed time interval.
