Skip to main content
POST
/
v1
/
workflows
/
{workflow_id}
/
block-tests
/
execute
from retab import Retab

client = Retab()

# Single test
batch = client.workflows.tests.execute(
    workflow_id="wf_abc123xyz",
    test_id="wfnodetest_hsLEQiM61ez9Piv147MWk",
)

# All tests for a single block
batch = client.workflows.tests.execute(
    workflow_id="wf_abc123xyz",
    target={"type": "block", "block_id": "block_extract_invoice"},
    n_consensus=5,
)

# Every test in the workflow
batch = client.workflows.tests.execute(
    workflow_id="wf_abc123xyz",
)

print(f"Batch {batch.batch_id} queued — poll job {batch.job_id}")
{
  "batch_id": "btbatch_q1z2",
  "job_id": "job_abc",
  "status": "queued",
  "workflow_id": "wf_abc123xyz",
  "target": { "type": "block", "block_id": "block_extract_invoice" },
  "test_id": null,
  "total_tests": 4
}

Documentation Index

Fetch the complete documentation index at: https://docs.retab.com/llms.txt

Use this file to discover all available pages before exploring further.

Run one block test, all tests for a single block, or every test in a workflow. Execution is asynchronous: the response returns immediately with a batch_id + job_id. Poll jobs.retrieve(job_id) until the job completes, then read the parsed batch result from job.response.body. The request body provides EXACTLY ONE of:
  • test_id — run a single test by id.
  • target — run all tests for a single block ({ type: "block", block_id: ... }).
  • neither — run every test in the workflow.
n_consensus is optional and only meaningful for extract / split / classifier blocks. Allowed values are 3, 5, or 7. Provide it to override the block’s configured consensus count for this batch only.
from retab import Retab

client = Retab()

# Single test
batch = client.workflows.tests.execute(
    workflow_id="wf_abc123xyz",
    test_id="wfnodetest_hsLEQiM61ez9Piv147MWk",
)

# All tests for a single block
batch = client.workflows.tests.execute(
    workflow_id="wf_abc123xyz",
    target={"type": "block", "block_id": "block_extract_invoice"},
    n_consensus=5,
)

# Every test in the workflow
batch = client.workflows.tests.execute(
    workflow_id="wf_abc123xyz",
)

print(f"Batch {batch.batch_id} queued — poll job {batch.job_id}")
{
  "batch_id": "btbatch_q1z2",
  "job_id": "job_abc",
  "status": "queued",
  "workflow_id": "wf_abc123xyz",
  "target": { "type": "block", "block_id": "block_extract_invoice" },
  "test_id": null,
  "total_tests": 4
}

Polling for the batch

Poll jobs.retrieve(job_id) until terminal. When the job completes, job.response.body is the BlockTestBatchExecutionResult.
python
import time

from retab.types.workflows import BlockTestBatchExecutionResult

batch = client.workflows.tests.execute(workflow_id="wf_abc123xyz")
deadline = time.monotonic() + 600
while True:
    job = client.jobs.retrieve(batch.job_id)
    if job.status in ("completed", "failed", "cancelled", "expired"):
        break
    if time.monotonic() >= deadline:
        raise TimeoutError(f"Timed out waiting for block-test job {batch.job_id}")
    time.sleep(2)

if job.status != "completed":
    raise RuntimeError(f"Block-test job failed: {job.error}")

payload = job.response.body if job.response is not None else {}
result = BlockTestBatchExecutionResult.model_validate(payload)
print(f"{result.counts.passed}/{result.counts.passed + result.counts.failed} passed")
for item in result.results:
    print(item.test_id, item.status, item.duration_ms)
javascript
const batch = await client.workflows.tests.execute({
  workflowId: "wf_abc123xyz",
});

const deadlineMs = Date.now() + 600_000;
let job;
while (true) {
  job = await client.jobs.retrieve(batch.job_id);
  if (["completed", "failed", "cancelled", "expired"].includes(job.status)) {
    break;
  }
  if (Date.now() >= deadlineMs) {
    throw new Error(`Timed out waiting for block-test job ${batch.job_id}`);
  }
  await new Promise((resolve) => setTimeout(resolve, 2000));
}

if (job.status !== "completed") {
  throw new Error(`Block-test job failed: ${JSON.stringify(job.error)}`);
}

const result = job.response?.body;
console.log(`${result.counts.passed}/${result.results.length} passed`);
for (const item of result.results) {
  console.log(item.test_id, item.status, item.duration_ms);
}
go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	retab "github.com/retab-dev/retab/clients/go"
)

func main() {
	ctx := context.Background()

	client, err := retab.NewClient("")
	if err != nil {
		log.Fatal(err)
	}

	batch, err := client.Workflows.Tests.Execute(ctx, retab.ExecuteBlockTestsRequest{
		WorkflowID: "wf_abc123xyz",
	})
	if err != nil {
		log.Fatal(err)
	}

	deadline := time.Now().Add(10 * time.Minute)
	var job *retab.Job
	for {
		job, err = client.Jobs.Retrieve(ctx, batch.JobID, nil)
		if err != nil {
			log.Fatal(err)
		}
		status, _ := (*job)["status"].(string)
		if status == "completed" || status == "failed" || status == "cancelled" || status == "expired" {
			break
		}
		if time.Now().After(deadline) {
			log.Fatalf("timed out waiting for block-test job %s", batch.JobID)
		}
		time.Sleep(2 * time.Second)
	}
	if status, _ := (*job)["status"].(string); status != "completed" {
		log.Fatalf("block-test job failed: %v", (*job)["error"])
	}

	response, _ := (*job)["response"].(map[string]any)
	result, _ := response["body"].(map[string]any)
	counts, _ := result["counts"].(map[string]any)
	results, _ := result["results"].([]any)
	fmt.Printf("%v/%d passed\n", counts["passed"], len(results))
	for _, item := range results {
		entry, _ := item.(map[string]any)
		fmt.Println(entry["test_id"], entry["status"], entry["duration_ms"])
	}
}
Treat failed, cancelled, and expired as terminal failures. If the polling deadline elapses before a terminal status, raise or return a timeout from your application code.

What the job’s response.body contains

After jobs.retrieve(job_id) returns a completed job, response.body is the BlockTestBatchExecutionResult:
{
  "workflow_id": "wf_abc123xyz",
  "target": { "type": "block", "block_id": "block_extract_invoice" },
  "counts": {
    "queued": 0,
    "running": 0,
    "passed": 3,
    "failed": 1,
    "blocked": 0,
    "error": 0,
    "cancelled": 0
  },
  "results": [
    {
      "test_id": "wfnodetest_a",
      "run_record_id": "wfnodetestrun_a",
      "status": "passed",
      "workflow_id": "wf_abc123xyz",
      "target": { "type": "block", "block_id": "block_extract_invoice" },
      "duration_ms": 18221
    }
  ]
}
Each BlockTestBatchExecutionItem carries the run_record_id you can pass to Get Block Test Run to fetch the full execution snapshot (inputs, outputs, assertion_result, verdict_summary, etc.). The counts object has one bucket per run-record status value.

Authorizations

Api-Key
string
header
required

Path Parameters

workflow_id
string
required

Body

application/json
test_id
string | null
target
WorkflowTestBlockTarget · object

Public workflow-test target.

The storage layer remains block-scoped today, but the API shape names the tested entity explicitly so workflow-level targets can be added later.

n_consensus
integer | null

Response

Successful Response

batch_id
string
required
job_id
string
required
workflow_id
string
required
total_tests
integer
required
status
string
default:queued
Allowed value: "queued"
target
WorkflowTestBlockTarget · object

Public workflow-test target.

The storage layer remains block-scoped today, but the API shape names the tested entity explicitly so workflow-level targets can be added later.

test_id
string | null