Validating an AI workflow is harder than validating a deterministic function. The output varies between runs. There is no exact expected output to compare against. A workflow that technically completes — reaches the finish node, stores an artifact, emits a COMPLETED event — may still produce output that fails to serve its purpose. Two separate validation layers address these two separate failure modes.

Two Validation Tiers

Tier 1 validates the workflow lifecycle: does the job move through the correct status transitions, respond to slot requests, produce the required artifact format, and terminate cleanly? This is structural — it can be evaluated by inspecting events, status fields, and artifact metadata without reading artifact content.

Tier 2 validates the workflow goal: does the artifact content actually achieve what the workflow promises? A resume workflow that produces a valid PDF artifact has passed Tier 1. A resume workflow that produces a PDF containing only the original document, unchanged, has failed Tier 2.

Tier 1: The Workflow Runner

The WorkflowRunner orchestrates a full workflow lifecycle against the real infrastructure — not mocks. It submits a job, handles any SLOT_NEEDED pauses, and polls until a terminal state:

class WorkflowRunner:
    async def run(
        self,
        workflow_id: str,
        file_ids: list[str],
        initial_slots: dict | None = None,
    ) -> WorkflowResult:
        # 1. Submit job, check preflight validation
        job = await self._create_job(workflow_id, file_ids, initial_slots)
        if not job["preflightResult"].get("passed", True):
            pytest.fail(f"Preflight failed: {job['preflightResult']['errors']}")

        # 2. Confirm and start execution
        await self._confirm(workflow_id, job["jobSpecId"])

        # 3. Poll until terminal state, handling up to 10 slot rounds
        result = await self._poll_with_checkpoints(...)

        # 4. Fetch artifacts
        artifacts = await self._fetch_artifacts(workflow_id, job["jobSpecId"])

        return WorkflowResult(
            status=result["status"],
            artifacts=artifacts["artifacts"],
            slot_log=...,  # every slot request + response recorded
        )

The checkpoint polling loop runs up to MAX_CHECKPOINT_ROUNDS = 10 times. Each round polls until either a terminal state or a slots_pending state. In the latter case, the runner fills the pending slots with test-appropriate answers and re-confirms, then resumes polling:

async def _poll_with_checkpoints(self, ...) -> dict:
    for round_num in range(MAX_CHECKPOINT_ROUNDS):
        job = await self._poll_until(
            stop_statuses={"slots_pending", "completed", "partial", "failed"}
        )

        if job["status"] in ("completed", "partial"):
            return job  # success

        if job["status"] == "failed":
            pytest.fail(f"Job failed: {job.get('errorMessage')}")

        if job["status"] == "slots_pending":
            # Fill slots with test data and re-confirm
            answers = _pick_slot_answers(job["pendingSlots"])
            await self._fill_and_confirm(answers)
            # continue polling in next round

    pytest.fail(f"Did not complete after {MAX_CHECKPOINT_ROUNDS} checkpoint rounds")

Artifact Format Guardrail

The agent itself enforces a format guardrail before allowing complete_workflow to succeed. If the workflow spec declares output_specs, the agent checks that every required format has been produced:

def _check_artifact_guardrail(
    artifacts: list[dict],
    output_specs: list[dict],
    has_store_tool: bool,
) -> str | None:
    """Returns error message if guardrail rejects completion, else None."""
    if output_specs:
        required = {spec["format"].lower() for spec in output_specs if spec.get("format")}
        produced = {_MIME_TO_FORMAT.get(a.get("mime_type", ""), "") for a in artifacts}
        missing = required - produced
        if missing:
            return f"Required output formats {missing} not yet produced."
        return None

    # Backward compatibility: if no output_specs, require at least one artifact
    if has_store_tool and not artifacts:
        return "store_artifact must be called before complete_workflow."
    return None

# Format registry:
_MIME_TO_FORMAT = {
    "text/plain": "txt",
    "application/pdf": "pdf",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document": "docx",
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": "xlsx",
    "application/json": "json",
}

If the guardrail rejects completion, the agent receives the error message as a tool result and is expected to produce the missing format before retrying complete_workflow. This is a soft rejection that keeps the workflow alive rather than terminating it — the agent has the opportunity to correct the gap.

Tier 2: The Goal Validator

The GoalValidator evaluates artifact text against per-workflow quality criteria. It does not check if the output is "good" in a subjective sense — it checks if the output contains the structural and content markers that a correct output must have:

class GoalValidator:
    def validate(self, text: str) -> tuple[bool, list[str]]:
        failures = []

        # 1. Minimum length
        if len(text) < self._criteria["min_length"]:
            failures.append(f"Too short: {len(text)} chars")

        # 2. Required sections (synonym group matching)
        for synonyms in self._criteria.get("required_sections", []):
            if not any(s.lower() in text.lower() for s in synonyms):
                failures.append(f"Missing section: expected one of {synonyms}")

        # 3. Anti-patterns (things that MUST NOT appear)
        for pattern in self._criteria.get("anti_patterns", []):
            if pattern.lower() in text.lower():
                failures.append(f"Anti-pattern found: '{pattern}'")

        # 4. Quality checks (present/absent assertions with min_matches)
        for check in self._criteria.get("quality_checks", []):
            if check["type"] == "present":
                matched = sum(1 for p in check["patterns"]
                              if p.lower() in text.lower())
                if matched < check.get("min_matches", 1):
                    failures.append(f"Quality check failed: {check['reason']}")
            elif check["type"] == "absent":
                found = [p for p in check["patterns"] if p.lower() in text.lower()]
                if found:
                    failures.append(f"Forbidden pattern found: {found}")

        # 5. Structural checks
        for check in self._criteria.get("structural_checks", []):
            if check["type"] == "min_paragraphs":
                paragraphs = [p for p in text.split("\n\n") if p.strip()]
                if len(paragraphs) < check["count"]:
                    failures.append(f"Too few paragraphs: {len(paragraphs)}")

        return len(failures) == 0, failures

Synonym group matching is an important design choice. A resume must have an "experience" section — but the heading might be "Work Experience," "Employment History," "Professional Experience," or just "Experience." Requiring an exact string match would cause false failures on valid output. The synonym group passes if any one synonym matches:

# From resume_to_job_ready criteria:
"required_sections": [
    ["summary", "professional summary", "overview"],
    ["experience", "work experience", "employment history"],
    ["skills", "technical skills", "core competencies"],
]
# Anti-pattern example (cover letter content in a resume):
"anti_patterns": [
    "i am writing to apply",
]
# Quality check example (cover letter):
"quality_checks": [
    {
        "type": "absent",
        "patterns": ["to whom it may concern", "i am writing to apply for"],
        "reason": "Generic opening phrases undermine cover letter quality"
    }
]

Running All Workflows in One Test Suite

All active workflows are covered by a single parametrized test function. The test IDs are derived from the workflow spec IDs, making failures easy to identify in CI output:

# Each workflow gets its own test case with an isolated runner
@pytest.mark.parametrize("workflow_id", ACTIVE_WORKFLOW_IDS)
async def test_workflow_end_to_end(workflow_id, runner, test_files):
    # Tier 1: lifecycle validation
    result = await runner.run(
        workflow_id=workflow_id,
        file_ids=test_files[workflow_id],
    )
    assert result.status in ("completed", "partial")
    assert result.total_artifacts >= 1, "Must produce at least one artifact"

    # Tier 2: goal validation (if criteria defined for this workflow)
    validator = GoalValidator(workflow_id)
    if validator.has_criteria():
        artifact_text = result.artifacts[0]["content"]
        validator.assert_goal_achieved(artifact_text)

The goal validator is optional — not all workflows have defined quality criteria yet. Workflows without criteria pass on lifecycle checks alone. Adding criteria is a deliberate act: it requires someone to specify what "correct output" looks like for that workflow, which surfaces implicit quality assumptions as explicit, testable assertions.

A workflow that completes successfully but produces the wrong output is worse than a workflow that fails loudly — because it looks like it worked. The two-tier framework ensures we catch both: structural failures in Tier 1, and semantic failures in Tier 2.