Python SDK
The Python SDK provides both synchronous (Absurd) and asynchronous
(AsyncAbsurd) clients for building durable workflows. It uses
psycopg (v3) for database access.
Installation
Install the SDK from PyPI:
uv add absurd-sdk
# or with pip (not recommended)
pip install absurd-sdk
Before using the SDK, initialize the Absurd schema in Postgres and create at least one queue. See Database Setup and Migrations and the Quickstart.
Creating a Client
Synchronous
from absurd_sdk import Absurd
# From a connection string
app = Absurd(
"postgresql://user:pass@localhost:5432/mydb",
queue_name="default",
)
# From an existing psycopg Connection
from psycopg import Connection
conn = Connection.connect("postgresql://...", autocommit=True)
app = Absurd(conn, queue_name="default")
# Minimal — uses ABSURD_DATABASE_URL, then PGDATABASE,
# then postgresql://localhost/absurd; queue defaults to "default"
app = Absurd()
Asynchronous
from absurd_sdk import AsyncAbsurd
app = AsyncAbsurd(
"postgresql://user:pass@localhost:5432/mydb",
queue_name="default",
)
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
conn_or_url |
Connection | str | None |
ABSURD_DATABASE_URL, then PGDATABASE, then postgresql://localhost/absurd |
Database connection or URL |
queue_name |
str |
"default" |
Default queue for operations |
default_max_attempts |
int |
5 |
Default retry limit |
hooks |
AbsurdHooks |
None |
Lifecycle hooks |
Registering Tasks
Use the @register_task decorator:
@app.register_task("send-email")
def send_email(params, ctx):
rendered = ctx.step("render", lambda: f"<h1>{params['template']}</h1>")
ctx.step("send", lambda: {"accepted": [params["to"]], "html": rendered})
Decorator Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Task name |
queue |
str |
Client queue | Queue this task belongs to |
default_max_attempts |
int |
Client default | Default max attempts |
default_cancellation |
CancellationPolicy |
None |
Default cancellation policy |
Async Tasks
app = AsyncAbsurd("postgresql://...")
@app.register_task("send-email")
async def send_email(params, ctx):
async def render_email():
return f"<h1>{params['template']}</h1>"
rendered = await ctx.step("render", render_email)
async def send_email_payload():
return {"accepted": [params["to"]], "html": rendered}
await ctx.step("send", send_email_payload)
Spawning Tasks
result = app.spawn(
"send-email",
{"to": "user@example.com", "template": "welcome"},
max_attempts=10,
retry_strategy={
"kind": "exponential",
"base_seconds": 2,
"factor": 2,
"max_seconds": 300,
},
headers={"trace_id": "..."},
idempotency_key="welcome:user-42",
)
print(result["task_id"], result["run_id"])
Spawn Parameters
| Parameter | Type | Description |
|---|---|---|
task_name |
str |
Name of the task to spawn |
params |
Any |
JSON-serializable parameters |
max_attempts |
int |
Max retry attempts |
retry_strategy |
RetryStrategy |
Backoff configuration |
headers |
dict |
Metadata attached to the task |
queue |
str |
Target queue (must match registration if registered) |
cancellation |
CancellationPolicy |
Auto-cancellation policy |
idempotency_key |
str |
Dedup key |
SpawnResult
A TypedDict with fields: task_id, run_id, attempt.
Task Results
fetch_task_result(task_id, queue_name=None)
Returns the current task result snapshot dataclass, or None if the task does
not exist.
snapshot = app.fetch_task_result(task_id)
if snapshot is not None:
print(snapshot.state, snapshot.result, snapshot.failure)
await_task_result(task_id, timeout=None, queue_name=None)
Polls until the task reaches a terminal state (completed, failed,
cancelled). Raises TimeoutError if timeout is reached.
final = app.await_task_result(task_id, timeout=30)
if final.state == "failed":
print(final.failure)
(Async client equivalents are await app.fetch_task_result(...) and
await app.await_task_result(...) and return the same dataclass type.)
Task Context
Synchronous (TaskContext)
ctx.task_id
The unique identifier of the current task.
ctx.headers
Read-only mapping of headers attached to the task.
ctx.step(name, fn)
Run an idempotent step. fn is a zero-argument callable whose return value
is cached.
result = ctx.step("fetch-data", lambda: {"ok": True, "source": "demo"})
ctx.begin_step(name) + ctx.complete_step(handle, value)
Advanced form of step() when you need to split checkpoint handling across
multiple calls.
handle = ctx.begin_step("agent-turn")
if handle.done:
state = handle.state
else:
state = ctx.complete_step(handle, {"message": "hello"})
ctx.run_step
Decorator form of step(). The decorated function is immediately called and
replaced with its return value:
@ctx.run_step
def payment():
return {"charge_id": f"charge-{params['amount']}"}
# `payment` is now the return value, not the function
print(payment)
With a custom name:
@ctx.run_step("process-payment")
def payment():
return {"charge_id": f"charge-{params['amount']}"}
ctx.sleep_for(step_name, seconds)
Suspend the task for a duration.
ctx.sleep_for("cooldown", 3600)
ctx.sleep_until(step_name, wake_at)
Suspend until an absolute time. Accepts a datetime, or a UNIX timestamp
(int/float).
from datetime import datetime, timezone
ctx.sleep_until("deadline", datetime(2025, 12, 31, tzinfo=timezone.utc))
ctx.await_event(event_name, step_name=None, timeout=None)
Suspend until a named event is emitted. Returns the event payload.
payload = ctx.await_event("order.shipped", timeout=86400)
Raises TimeoutError if the timeout expires.
ctx.await_task_result(task_id, queue_name=None, timeout=None, step_name=None)
Durably wait for another task's terminal result from inside a task handler.
The wait is checkpointed as a step (default step name:
$awaitTaskResult:<task_id>).
queue_name must point to a different queue than the currently running
task context queue.
child = app.spawn("child-task", {}, queue="child-workers")
child_result = ctx.await_task_result(
child["task_id"],
queue_name="child-workers",
timeout=60,
)
if child_result.state == "completed":
print(child_result.result)
ctx.heartbeat(seconds=None)
Extend the current run's lease.
ctx.heartbeat(300)
ctx.emit_event(event_name, payload=None)
Emit an event on the current queue.
ctx.emit_event("order.completed", {"order_id": "42"})
Asynchronous (AsyncTaskContext)
The async context has the same methods, all async:
async def fetch_data():
return {"ok": True, "source": "demo"}
result = await ctx.step("fetch-data", fetch_data)
handle = await ctx.begin_step("agent-turn")
result = (
handle.state
if handle.done
else await ctx.complete_step(handle, {"ok": True})
)
await ctx.sleep_for("cooldown", 3600)
payload = await ctx.await_event("order.shipped")
child_result = await ctx.await_task_result(
child_task_id,
queue_name="child-workers",
timeout=60,
)
await ctx.heartbeat(300)
await ctx.emit_event("order.completed", {"order_id": "42"})
Events
app.emit_event("order.shipped", {"tracking": "XYZ"})
app.emit_event("order.shipped", {"tracking": "XYZ"}, queue_name="orders")
Cancellation
app.cancel_task(task_id)
app.cancel_task(task_id, queue_name="other-queue")
Retrying Failed Tasks
result = app.retry_task(task_id, max_attempts=5, spawn_new=False)
Returns a RetryTaskResult with task_id, run_id, attempt, created.
Queue Management
app.create_queue("emails")
app.drop_queue("emails")
queues = app.list_queues() # ["default", "emails"]
Starting a Worker
Synchronous (Blocking)
app.start_worker(
worker_id="web-1",
claim_timeout=120,
concurrency=1,
poll_interval=0.25,
)
Call app.stop_worker() from a signal handler for graceful shutdown.
Asynchronous
await app.start_worker(
worker_id="web-1",
claim_timeout=120,
concurrency=4,
poll_interval=0.25,
)
Single-Batch Processing
app.work_batch(worker_id="cron-1", claim_timeout=120, batch_size=10)
Context Variable
Access the current task context from anywhere in the call stack:
from absurd_sdk import get_current_context
def helper():
ctx = get_current_context()
if ctx is not None:
ctx.heartbeat(60)
Works for both sync and async contexts.
Switching Between Sync and Async
# Sync → Async
async_app = app.make_async()
# Async → Sync
sync_app = await async_app.make_sync()
Both share the same connection string but create independent connections.
Hooks
before_spawn
def get_trace_id():
return "trace-123"
def inject_trace(task_name, params, options):
options["headers"] = {
**(options.get("headers") or {}),
"trace_id": get_trace_id(),
}
return options
app = Absurd(hooks={"before_spawn": inject_trace})
wrap_task_execution
from contextlib import contextmanager
@contextmanager
def start_trace(trace_id):
yield
def with_tracing(ctx, execute):
trace_id = ctx.headers.get("trace_id")
with start_trace(trace_id):
return execute()
app = Absurd(hooks={"wrap_task_execution": with_tracing})
Both hooks also accept async callables when used with AsyncAbsurd.
Error Types
| Error | Description |
|---|---|
SuspendTask |
Internal — task suspended. Never visible to user code. |
CancelledTask |
Internal — task was cancelled. |
FailedTask |
Internal — run already failed. |
TimeoutError |
Thrown by await_event() / await_task_result() when timeout expires. |
Closing
# Sync
app.close()
# Async
await app.close()
Stops the worker and closes the connection if it was created by the client.