Skip to main content
The Jobs API enables asynchronous processing for document operations. Instead of waiting for a synchronous response, you submit a job that processes in the background and poll for results when ready.

Why Use the Jobs API?

The Jobs API provides several advantages over synchronous requests:
  • Avoid request timeouts: Process large documents that would exceed HTTP timeout limits
  • Resilient to disconnections: Jobs continue processing even if your client disconnects
  • Higher throughput: Queue multiple jobs without waiting for each to complete
  • Progress tracking: Monitor job status and get detailed error information
Use the Jobs API when processing documents that may take more than a few seconds, when you need to process multiple documents in parallel, or when you want guaranteed delivery of results.

Getting Started

This guide walks you through creating a job, polling for completion, and retrieving results.

1. Prepare Your Request

Before creating a job, prepare your document and any required parameters. Documents can be provided as base64-encoded data URLs:
import base64

# Load and encode your document
with open("invoice.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "invoice.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

2. Create the Job

Submit your job by specifying the target endpoint and the request body:
from retab import Retab

client = Retab()

job = client.jobs.create(
    endpoint="/v1/documents/extract",
    request={
        "document": document,
        "json_schema": schema,
        "model": "retab-small",
    },
)

print(f"Job created: {job.id}")
print(f"Status: {job.status}")  # "queued"

3. Check Job Status

Poll the job until it reaches a terminal status:
import time

while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")
SDK note: jobs.retrieve(...) is lightweight by default in both Python and JavaScript clients. Use retrieve_full/retrieveFull when you need request or response payloads. Or use the SDK helper method:
completed_job = client.jobs.wait_for_completion(
    job.id,
    poll_interval_seconds=2.0,  # seconds between polls
    timeout_seconds=300         # max seconds to wait
)

4. Retrieve Results

If you used lightweight polling with jobs.retrieve(...), fetch full payloads before reading response:
if job.status == "completed":
    job = client.jobs.retrieve_full(job.id)
    result = job.response.body
    print(result)
elif job.status == "failed":
    print(f"Error: {job.error.message}")

5. Cancel a Job (Optional)

Cancel a job that is queued or in progress:
cancelled_job = client.jobs.cancel(job.id)
print(f"Status: {cancelled_job.status}")  # "cancelled"

6. List Jobs

View all your jobs with optional filtering: By default, GET /v1/jobs returns a lightweight list for fast polling:
  • request is omitted (null)
  • response is omitted (null)
Use GET /v1/jobs?include_request=true&include_response=true if you need full payloads in list results. For most workflows, list for discovery and call retrieve_full/retrieveFull on specific job IDs for full details.
# List all jobs
jobs_response = client.jobs.list(limit=20)
for job in jobs_response.data:
    print(f"{job.id}: {job.status}")

# Filter by status
queued_jobs = client.jobs.list(status="queued")

# Paginate through results
if jobs_response.has_more:
    next_page = client.jobs.list(after=jobs_response.last_id)

Complete Examples

Extract Structured Data

Extract structured data from documents using a JSON schema:
import base64
import json
import time
from retab import Retab

client = Retab()

# Load document
with open("invoice.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "invoice.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

# Define extraction schema
schema = {
    "type": "object",
    "properties": {
        "invoice_number": {"type": "string"},
        "date": {"type": "string"},
        "total_amount": {"type": "number"},
        "vendor_name": {"type": "string"},
    },
    "required": ["invoice_number", "total_amount"],
}

# Create extraction job
job = client.jobs.create(
    endpoint="/v1/documents/extract",
    request={
        "document": document,
        "json_schema": schema,
        "model": "retab-small",
    },
)
print(f"Job created: {job.id}")

# Poll until complete
while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")

# Get extracted data
if job.status == "completed":
    job = client.jobs.retrieve_full(job.id)
    extracted = job.response.body["choices"][0]["message"]["parsed"]
    print(f"Invoice: {extracted['invoice_number']}")
    print(f"Total: ${extracted['total_amount']}")
else:
    print(f"Job failed: {job.error.message}")

Parse Document to Text

Convert documents to text or markdown format:
import base64
import time
from retab import Retab

client = Retab()

# Load document
with open("document.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "document.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

# Create parse job
job = client.jobs.create(
    endpoint="/v1/documents/parse",
    request={
        "document": document,
    },
)
print(f"Job created: {job.id}")

# Poll until complete
while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")

# Get parsed content
if job.status == "completed":
    job = client.jobs.retrieve_full(job.id)
    result = job.response.body
    print(f"Parsed text:\n{result['text']}")
    print(f"Total pages: {len(result['pages'])}")
else:
    print(f"Job failed: {job.error.message}")

Classify Documents

Classify documents into predefined categories:
import base64
import time
from retab import Retab

client = Retab()

# Load document
with open("document.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "document.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

# Define categories
categories = [
    {"name": "Invoice", "description": "Invoice or bill documents"},
    {"name": "Bank Statement", "description": "Bank account statements"},
    {"name": "Receipt", "description": "Payment receipts"},
    {"name": "Contract", "description": "Legal contracts or agreements"},
    {"name": "ID Document", "description": "Identity documents"},
    {"name": "Other", "description": "Other document types"},
]

# Create classification job
job = client.jobs.create(
    endpoint="/v1/documents/classify",
    request={
        "document": document,
        "model": "retab-small",
        "categories": categories,
    },
)
print(f"Job created: {job.id}")

# Poll until complete
while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")

# Get classification result
if job.status == "completed":
    job = client.jobs.retrieve_full(job.id)
    result = job.response.body["result"]
    print(f"Classification: {result['classification']}")
    print(f"Reasoning: {result['reasoning']}")
else:
    print(f"Job failed: {job.error.message}")

Supported Endpoints

The Jobs API supports the following endpoints:
EndpointDescription
/v1/documents/extractExtract structured data from documents
/v1/documents/parseParse documents to text/markdown
/v1/documents/splitSplit multi-page documents
/v1/documents/classifyClassify documents into categories
/v1/schemas/generateGenerate JSON schemas from documents
/v1/edit/agent/fillFill form fields using AI agent
/v1/edit/templates/fillFill templates with data
/v1/edit/templates/generateGenerate form schemas from PDFs
/v1/projects/extractExtract using a project configuration

Job Lifecycle

Jobs progress through the following statuses:
validating → queued → in_progress → completed/failed/cancelled/expired
StatusDescription
validatingThe request is being validated before the job can begin
queuedJob is waiting in the processing queue
in_progressJob is currently being processed by the executor
completedJob finished successfully, results available in response
failedJob execution failed, see error field for details
cancelledJob was cancelled by user request
expiredJob data expired (7 days after creation)

Rate Limits and Constraints

The Jobs API has the following limits:
ConstraintLimit
Metadata key-value pairs16 maximum
Metadata key length64 characters
Metadata value length512 characters
Concurrent jobsUnlimited
Jobs are processed in the order they are created. There is no limit on the number of concurrent jobs, but processing throughput depends on system capacity.

Job Response Structure

When a job completes successfully, the result is available in the response field:
{
  "id": "job_abc123xyz",
  "object": "job",
  "status": "completed",
  "endpoint": "/v1/documents/extract",
  "request": { ... },
  "response": {
    "status_code": 200,
    "body": {
      "choices": [
        {
          "message": {
            "parsed": {
              "invoice_number": "INV-2024-001",
              "total": 1250.00
            }
          }
        }
      ]
    }
  },
  "created_at": 1705420800,
  "started_at": 1705420802,
  "completed_at": 1705420810,
  "organization_id": "org_xxx"
}
Note: in list responses (GET /v1/jobs) and retrieve responses (GET /v1/jobs/{job_id}), the request and response fields are null by default unless include_request/include_response query parameters are enabled.

Error Handling

When a job fails, the error field contains details about what went wrong:
{
  "id": "job_abc123xyz",
  "status": "failed",
  "error": {
    "code": "validation_error",
    "message": "Invalid JSON schema: missing required field 'type'",
    "details": { ... }
  }
}
Jobs can also include non-fatal warnings (for example, if request/response document restoration from storage fails during retrieve/list). In that case, affected document objects may include gcs_error placeholders such as download_failed or reconstruct_failed. Handle errors in your code:
job = client.jobs.wait_for_completion(job_id)

if job.status == "failed":
    print(f"Error code: {job.error.code}")
    print(f"Error message: {job.error.message}")
    if job.error.details:
        print(f"Details: {job.error.details}")

Best Practices

Polling Strategy

  • Use reasonable poll intervals (2-5 seconds) to balance responsiveness and API usage
  • Set timeouts to avoid indefinite waiting for stuck jobs
  • Use the wait_for_completion helper method when possible
# Recommended: Use the helper with timeout
try:
    job = client.jobs.wait_for_completion(
        job.id,
        poll_interval_seconds=2.0,
        timeout_seconds=300
    )
except TimeoutError:
    print("Job did not complete within timeout")

Using Metadata

Use the metadata field to attach context to jobs for tracking and filtering:
job = client.jobs.create(
    endpoint="/v1/documents/extract",
    request={ ... },
    metadata={
        "batch_id": "batch_20240115",
        "source": "email_attachments",
        "user_id": "user_123",
    }
)
Metadata is preserved throughout the job lifecycle and can help with:
  • Tracking jobs by batch or source system
  • Debugging failed jobs
  • Filtering jobs when listing

Error Recovery

Implement retry logic for transient failures:
import time

MAX_RETRIES = 3

for attempt in range(MAX_RETRIES):
    job = client.jobs.create(endpoint=endpoint, request=request)
    job = client.jobs.wait_for_completion(job.id, timeout_seconds=300)

    if job.status == "completed":
        break
    elif job.status == "failed":
        if attempt < MAX_RETRIES - 1:
            print(f"Attempt {attempt + 1} failed, retrying...")
            time.sleep(5)
        else:
            raise Exception(f"Job failed after {MAX_RETRIES} attempts: {job.error.message}")

Processing Multiple Documents

For batch processing, create jobs in parallel and collect results:
import concurrent.futures

def process_document(doc_path):
    with open(doc_path, "rb") as f:
        content = base64.b64encode(f.read()).decode()

    job = client.jobs.create(
        endpoint="/v1/documents/extract",
        request={
            "document": {"filename": doc_path, "url": f"data:application/pdf;base64,{content}"},
            "json_schema": schema,
            "model": "retab-small",
        }
    )
    return client.jobs.wait_for_completion(job.id)

# Process documents in parallel
documents = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(process_document, documents))