Python SDK¶
The Python SDK provides both synchronous (Absurd) and asynchronous
(AsyncAbsurd) clients for building durable workflows. It uses
psycopg (v3) for database access.
Installation¶
Install the SDK from PyPI:
Creating a Client¶
Synchronous¶
from absurd_sdk import Absurd
# From a connection string (or ABSURD_DATABASE_URL env var)
app = Absurd("postgresql://user:pass@localhost:5432/mydb", queue_name="default")
# From an existing psycopg Connection
from psycopg import Connection
conn = Connection.connect("postgresql://...", autocommit=True)
app = Absurd(conn, queue_name="default")
# Minimal — uses ABSURD_DATABASE_URL and queue "default"
app = Absurd()
Asynchronous¶
from absurd_sdk import AsyncAbsurd
app = AsyncAbsurd("postgresql://user:pass@localhost:5432/mydb", queue_name="default")
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
conn_or_url |
Connection | str | None |
ABSURD_DATABASE_URL or postgresql://localhost/absurd |
Database connection or URL |
queue_name |
str |
"default" |
Default queue for operations |
default_max_attempts |
int |
5 |
Default retry limit |
hooks |
AbsurdHooks |
None |
Lifecycle hooks |
Registering Tasks¶
Use the @register_task decorator:
@app.register_task("send-email")
def send_email(params, ctx):
rendered = ctx.step("render", lambda: render_template(params["template"]))
ctx.step("send", lambda: mailer.send(to=params["to"], html=rendered))
Decorator Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
required | Task name |
queue |
str |
Client queue | Queue this task belongs to |
default_max_attempts |
int |
Client default | Default max attempts |
default_cancellation |
CancellationPolicy |
None |
Default cancellation policy |
Async Tasks¶
app = AsyncAbsurd("postgresql://...")
@app.register_task("send-email")
async def send_email(params, ctx):
rendered = await ctx.step("render", render_template_async)
await ctx.step("send", lambda: mailer.send(to=params["to"], html=rendered))
Spawning Tasks¶
result = app.spawn(
"send-email",
{"to": "user@example.com", "template": "welcome"},
max_attempts=10,
retry_strategy={"kind": "exponential", "base_seconds": 2, "factor": 2, "max_seconds": 300},
headers={"trace_id": "..."},
idempotency_key="welcome:user-42",
)
print(result["task_id"], result["run_id"])
Spawn Parameters¶
| Parameter | Type | Description |
|---|---|---|
task_name |
str |
Name of the task to spawn |
params |
Any |
JSON-serializable parameters |
max_attempts |
int |
Max retry attempts |
retry_strategy |
RetryStrategy |
Backoff configuration |
headers |
dict |
Metadata attached to the task |
queue |
str |
Target queue (must match registration if registered) |
cancellation |
CancellationPolicy |
Auto-cancellation policy |
idempotency_key |
str |
Dedup key |
SpawnResult¶
A TypedDict with fields: task_id, run_id, attempt.
Task Results¶
fetch_task_result(task_id, queue_name=None)¶
Returns the current task result snapshot dataclass, or None if the task does
not exist.
snapshot = app.fetch_task_result(task_id)
if snapshot is not None:
print(snapshot.state, snapshot.result, snapshot.failure)
await_task_result(task_id, timeout=None, queue_name=None)¶
Polls until the task reaches a terminal state (completed, failed,
cancelled). Raises TimeoutError if timeout is reached.
(Async client equivalents are await app.fetch_task_result(...) and
await app.await_task_result(...) and return the same dataclass type.)
Task Context¶
Synchronous (TaskContext)¶
ctx.task_id¶
The unique identifier of the current task.
ctx.headers¶
Read-only mapping of headers attached to the task.
ctx.step(name, fn)¶
Run an idempotent step. fn is a zero-argument callable whose return value
is cached.
ctx.begin_step(name) + ctx.complete_step(handle, value)¶
Advanced form of step() when you need to split checkpoint handling across
multiple calls.
handle = ctx.begin_step("agent-turn")
if handle.done:
state = handle.state
else:
state = ctx.complete_step(handle, {"message": "hello"})
ctx.run_step¶
Decorator form of step(). The decorated function is immediately called and
replaced with its return value:
@ctx.run_step
def payment():
return stripe.charges.create(amount=params["amount"])
# `payment` is now the return value, not the function
print(payment)
With a custom name:
@ctx.run_step("process-payment")
def payment():
return stripe.charges.create(amount=params["amount"])
ctx.sleep_for(step_name, seconds)¶
Suspend the task for a duration.
ctx.sleep_until(step_name, wake_at)¶
Suspend until an absolute time. Accepts a datetime, or a UNIX timestamp
(int/float).
from datetime import datetime, timezone
ctx.sleep_until("deadline", datetime(2025, 12, 31, tzinfo=timezone.utc))
ctx.await_event(event_name, step_name=None, timeout=None)¶
Suspend until a named event is emitted. Returns the event payload.
Raises TimeoutError if the timeout expires.
ctx.await_task_result(task_id, queue_name=None, timeout=None, step_name=None)¶
Durably wait for another task's terminal result from inside a task handler.
The wait is checkpointed as a step (default step name:
$awaitTaskResult:<task_id>).
queue_name must point to a different queue than the currently running
task context queue.
child = app.spawn("child-task", {}, queue="child-workers")
child_result = ctx.await_task_result(
child["task_id"],
queue_name="child-workers",
timeout=60,
)
if child_result.state == "completed":
print(child_result.result)
ctx.heartbeat(seconds=None)¶
Extend the current run's lease.
ctx.emit_event(event_name, payload=None)¶
Emit an event on the current queue.
Asynchronous (AsyncTaskContext)¶
The async context has the same methods, all async:
result = await ctx.step("fetch-data", fetch_from_api_async)
handle = await ctx.begin_step("agent-turn")
result = handle.state if handle.done else await ctx.complete_step(handle, {"ok": True})
await ctx.sleep_for("cooldown", 3600)
payload = await ctx.await_event("order.shipped")
child_result = await ctx.await_task_result(
child_task_id,
queue_name="child-workers",
timeout=60,
)
await ctx.heartbeat(300)
await ctx.emit_event("order.completed", {"order_id": "42"})
Events¶
app.emit_event("order.shipped", {"tracking": "XYZ"})
app.emit_event("order.shipped", {"tracking": "XYZ"}, queue_name="orders")
Cancellation¶
Retrying Failed Tasks¶
Returns a RetryTaskResult with task_id, run_id, attempt, created.
Queue Management¶
app.create_queue("emails")
app.drop_queue("emails")
queues = app.list_queues() # ["default", "emails"]
Starting a Worker¶
Synchronous (Blocking)¶
Call app.stop_worker() from a signal handler for graceful shutdown.
Asynchronous¶
Single-Batch Processing¶
Context Variable¶
Access the current task context from anywhere in the call stack:
from absurd_sdk import get_current_context
def helper():
ctx = get_current_context()
if ctx is not None:
ctx.heartbeat(60)
Works for both sync and async contexts.
Switching Between Sync and Async¶
Both share the same connection string but create independent connections.
Hooks¶
before_spawn¶
def inject_trace(task_name, params, options):
options["headers"] = {**(options.get("headers") or {}), "trace_id": get_trace_id()}
return options
app = Absurd(hooks={"before_spawn": inject_trace})
wrap_task_execution¶
def with_tracing(ctx, execute):
trace_id = ctx.headers.get("trace_id")
with start_trace(trace_id):
return execute()
app = Absurd(hooks={"wrap_task_execution": with_tracing})
Both hooks also accept async callables when used with AsyncAbsurd.
Error Types¶
| Error | Description |
|---|---|
SuspendTask |
Internal — task suspended. Never visible to user code. |
CancelledTask |
Internal — task was cancelled. |
FailedTask |
Internal — run already failed. |
TimeoutError |
Thrown by await_event() / await_task_result() when timeout expires. |
Closing¶
Stops the worker and closes the connection if it was created by the client.