Skip to main content
The Jobs API enables asynchronous processing for document operations. Instead of waiting for a synchronous response, you submit a job that processes in the background and poll for results when ready.

Why Use the Jobs API?

The Jobs API provides several advantages over synchronous requests:
  • Avoid request timeouts: Process large documents that would exceed HTTP timeout limits
  • Resilient to disconnections: Jobs continue processing even if your client disconnects
  • Higher throughput: Queue multiple jobs without waiting for each to complete
  • 7-day result retention: Retrieve results anytime within the retention window
  • Progress tracking: Monitor job status and get detailed error information
Use the Jobs API when processing documents that may take more than a few seconds, when you need to process multiple documents in parallel, or when you want guaranteed delivery of results.

Getting Started

This guide walks you through creating a job, polling for completion, and retrieving results.

1. Prepare Your Request

Before creating a job, prepare your document and any required parameters. Documents can be provided as base64-encoded data URLs:
import base64

# Load and encode your document
with open("invoice.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "invoice.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

2. Create the Job

Submit your job by specifying the target endpoint and the request body:
from retab import Retab

client = Retab()

job = client.jobs.create(
    endpoint="/v1/documents/extract",
    request={
        "document": document,
        "json_schema": schema,
        "model": "retab-small",
    },
)

print(f"Job created: {job.id}")
print(f"Status: {job.status}")  # "queued"

3. Check Job Status

Poll the job until it reaches a terminal status:
import time

while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")
Or use the SDK helper method:
completed_job = client.jobs.wait_for_completion(
    job.id,
    poll_interval=2.0,  # seconds between polls
    timeout=300         # max seconds to wait
)

4. Retrieve Results

Once the job completes, access the results from the response field:
if job.status == "completed":
    result = job.response.body
    print(result)
elif job.status == "failed":
    print(f"Error: {job.error.message}")

5. Cancel a Job (Optional)

Cancel a job that is queued or in progress:
cancelled_job = client.jobs.cancel(job.id)
print(f"Status: {cancelled_job.status}")  # "cancelled"

6. List Jobs

View all your jobs with optional filtering:
# List all jobs
jobs_response = client.jobs.list(limit=20)
for job in jobs_response.data:
    print(f"{job.id}: {job.status}")

# Filter by status
queued_jobs = client.jobs.list(status="queued")

# Paginate through results
if jobs_response.has_more:
    next_page = client.jobs.list(after=jobs_response.last_id)

Complete Examples

Extract Structured Data

Extract structured data from documents using a JSON schema:
import base64
import json
import time
from retab import Retab

client = Retab()

# Load document
with open("invoice.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "invoice.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

# Define extraction schema
schema = {
    "type": "object",
    "properties": {
        "invoice_number": {"type": "string"},
        "date": {"type": "string"},
        "total_amount": {"type": "number"},
        "vendor_name": {"type": "string"},
    },
    "required": ["invoice_number", "total_amount"],
}

# Create extraction job
job = client.jobs.create(
    endpoint="/v1/documents/extract",
    request={
        "document": document,
        "json_schema": schema,
        "model": "retab-small",
    },
)
print(f"Job created: {job.id}")

# Poll until complete
while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")

# Get extracted data
if job.status == "completed":
    extracted = job.response.body["choices"][0]["message"]["parsed"]
    print(f"Invoice: {extracted['invoice_number']}")
    print(f"Total: ${extracted['total_amount']}")
else:
    print(f"Job failed: {job.error.message}")

Parse Document to Text

Convert documents to text or markdown format:
import base64
import time
from retab import Retab

client = Retab()

# Load document
with open("document.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "document.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

# Create parse job
job = client.jobs.create(
    endpoint="/v1/documents/parse",
    request={
        "document": document,
    },
)
print(f"Job created: {job.id}")

# Poll until complete
while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")

# Get parsed content
if job.status == "completed":
    result = job.response.body
    print(f"Parsed text:\n{result['text']}")
    print(f"Total pages: {len(result['pages'])}")
else:
    print(f"Job failed: {job.error.message}")

Classify Documents

Classify documents into predefined categories:
import base64
import time
from retab import Retab

client = Retab()

# Load document
with open("document.pdf", "rb") as f:
    content = base64.b64encode(f.read()).decode()
    document = {
        "filename": "document.pdf",
        "url": f"data:application/pdf;base64,{content}",
    }

# Define categories
categories = [
    {"name": "Invoice", "description": "Invoice or bill documents"},
    {"name": "Bank Statement", "description": "Bank account statements"},
    {"name": "Receipt", "description": "Payment receipts"},
    {"name": "Contract", "description": "Legal contracts or agreements"},
    {"name": "ID Document", "description": "Identity documents"},
    {"name": "Other", "description": "Other document types"},
]

# Create classification job
job = client.jobs.create(
    endpoint="/v1/documents/classify",
    request={
        "document": document,
        "model": "retab-small",
        "categories": categories,
    },
)
print(f"Job created: {job.id}")

# Poll until complete
while job.status not in ("completed", "failed", "cancelled", "expired"):
    time.sleep(2)
    job = client.jobs.retrieve(job.id)
    print(f"Status: {job.status}")

# Get classification result
if job.status == "completed":
    result = job.response.body["result"]
    print(f"Classification: {result['classification']}")
    print(f"Reasoning: {result['reasoning']}")
else:
    print(f"Job failed: {job.error.message}")

Supported Endpoints

The Jobs API supports the following endpoints:
EndpointDescription
/v1/documents/extractExtract structured data from documents
/v1/documents/parseParse documents to text/markdown
/v1/documents/splitSplit multi-page documents
/v1/documents/classifyClassify documents into categories
/v1/schemas/generateGenerate JSON schemas from documents
/v1/edit/agent/fillFill form fields using AI agent
/v1/edit/templates/fillFill templates with data
/v1/edit/templates/generateGenerate form schemas from PDFs
/v1/projects/extractExtract using a project configuration

Job Lifecycle

Jobs progress through the following statuses:
validating → queued → in_progress → completed/failed/cancelled/expired
StatusDescription
validatingThe request is being validated before the job can begin
queuedJob is waiting in the processing queue
in_progressJob is currently being processed by the executor
completedJob finished successfully, results available in response
failedJob execution failed, see error field for details
cancelledJob was cancelled by user request
expiredJob data expired (7 days after creation)

Rate Limits and Constraints

The Jobs API has the following limits:
ConstraintLimit
Metadata key-value pairs16 maximum
Metadata key length64 characters
Metadata value length512 characters
Concurrent jobsUnlimited
Result retention7 days
Jobs are processed in the order they are created. There is no limit on the number of concurrent jobs, but processing throughput depends on system capacity.

Job Expiration

Jobs and their associated data expire 7 days after creation:
  • Both the request and response data are deleted at expiration
  • The job record remains but with status: "expired" and empty request/response
  • Check the expires_at timestamp on the job object to know when data will be deleted
  • Retrieve and store results before expiration if you need long-term retention
# Check expiration timestamp
from datetime import datetime

expires_at = datetime.fromtimestamp(job.expires_at)
print(f"Results available until: {expires_at}")

Job Response Structure

When a job completes successfully, the result is available in the response field:
{
  "id": "job_abc123xyz",
  "object": "job",
  "status": "completed",
  "endpoint": "/v1/documents/extract",
  "request": { ... },
  "response": {
    "status_code": 200,
    "body": {
      "choices": [
        {
          "message": {
            "parsed": {
              "invoice_number": "INV-2024-001",
              "total": 1250.00
            }
          }
        }
      ]
    }
  },
  "created_at": 1705420800,
  "started_at": 1705420802,
  "completed_at": 1705420810,
  "expires_at": 1706025600,
  "organization_id": "org_xxx"
}

Error Handling

When a job fails, the error field contains details about what went wrong:
{
  "id": "job_abc123xyz",
  "status": "failed",
  "error": {
    "code": "validation_error",
    "message": "Invalid JSON schema: missing required field 'type'",
    "details": { ... }
  }
}
Handle errors in your code:
job = client.jobs.wait_for_completion(job_id)

if job.status == "failed":
    print(f"Error code: {job.error.code}")
    print(f"Error message: {job.error.message}")
    if job.error.details:
        print(f"Details: {job.error.details}")

Best Practices

Polling Strategy

  • Use reasonable poll intervals (2-5 seconds) to balance responsiveness and API usage
  • Set timeouts to avoid indefinite waiting for stuck jobs
  • Use the wait_for_completion helper method when possible
# Recommended: Use the helper with timeout
try:
    job = client.jobs.wait_for_completion(
        job.id,
        poll_interval=2.0,
        timeout=300
    )
except TimeoutError:
    print("Job did not complete within timeout")

Using Metadata

Use the metadata field to attach context to jobs for tracking and filtering:
job = client.jobs.create(
    endpoint="/v1/documents/extract",
    request={ ... },
    metadata={
        "batch_id": "batch_20240115",
        "source": "email_attachments",
        "user_id": "user_123",
    }
)
Metadata is preserved throughout the job lifecycle and can help with:
  • Tracking jobs by batch or source system
  • Debugging failed jobs
  • Filtering jobs when listing

Error Recovery

Implement retry logic for transient failures:
import time

MAX_RETRIES = 3

for attempt in range(MAX_RETRIES):
    job = client.jobs.create(endpoint=endpoint, request=request)
    job = client.jobs.wait_for_completion(job.id, timeout=300)

    if job.status == "completed":
        break
    elif job.status == "failed":
        if attempt < MAX_RETRIES - 1:
            print(f"Attempt {attempt + 1} failed, retrying...")
            time.sleep(5)
        else:
            raise Exception(f"Job failed after {MAX_RETRIES} attempts: {job.error.message}")

Processing Multiple Documents

For batch processing, create jobs in parallel and collect results:
import concurrent.futures

def process_document(doc_path):
    with open(doc_path, "rb") as f:
        content = base64.b64encode(f.read()).decode()

    job = client.jobs.create(
        endpoint="/v1/documents/extract",
        request={
            "document": {"filename": doc_path, "url": f"data:application/pdf;base64,{content}"},
            "json_schema": schema,
            "model": "retab-small",
        }
    )
    return client.jobs.wait_for_completion(job.id)

# Process documents in parallel
documents = ["doc1.pdf", "doc2.pdf", "doc3.pdf"]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(process_document, documents))