Go SDK
The Go SDK provides an idiomatic, typed client for building durable workflows
with Absurd. It uses the standard library's database/sql package and is
driver-agnostic: use any Postgres database/sql driver (pgx, pq, etc.).
The API is intentionally Go-shaped:
- task handlers take
context.Context - task definitions are typed with
absurd.Task(...) - durable task operations are package-level helpers such as
absurd.Step()andabsurd.AwaitEvent() - task metadata is available via
absurd.TaskFromContext()/absurd.MustTaskContext()
Installation
go get github.com/earendil-works/absurd/sdks/go/absurd@latest
Then import it:
import "github.com/earendil-works/absurd/sdks/go/absurd"
Before using the SDK, initialize the Absurd schema in Postgres and create at least one queue. See Database Setup and Migrations, the Quickstart, and absurdctl.
Creating a Client
From a connection string
package main
import (
"log"
"os"
"github.com/earendil-works/absurd/sdks/go/absurd"
_ "github.com/jackc/pgx/v5/stdlib"
)
func main() {
app, err := absurd.New(absurd.Options{
DriverName: "pgx",
DatabaseURL: os.Getenv("ABSURD_DATABASE_URL"),
QueueName: "default",
})
if err != nil {
log.Fatal(err)
}
defer app.Close()
}
From an existing *sql.DB
package main
import (
"database/sql"
"log"
"os"
"github.com/earendil-works/absurd/sdks/go/absurd"
_ "github.com/jackc/pgx/v5/stdlib"
)
func main() {
db, err := sql.Open("pgx", os.Getenv("PGDATABASE"))
if err != nil {
log.Fatal(err)
}
defer db.Close()
app, err := absurd.New(absurd.Options{
DB: db,
QueueName: "default",
})
if err != nil {
log.Fatal(err)
}
defer app.Close()
}
Minimal
app, err := absurd.New(absurd.Options{})
if err != nil {
log.Fatal(err)
}
defer app.Close()
This uses:
Options.DriverName(default:"postgres")Options.DatabaseURL, if provided- otherwise
ABSURD_DATABASE_URL - otherwise
PGDATABASE - otherwise
postgresql://localhost/absurd
The default queue is "default".
If you do not pass Options.DB, ensure the corresponding
database/sql Postgres driver is imported in your program.
Options
| Option | Type | Default | Description |
|---|---|---|---|
DB |
*sql.DB |
nil |
Existing database handle. If set, it wins over DatabaseURL. |
DriverName |
string |
"postgres" |
database/sql driver name used with sql.Open when DB is not provided |
DatabaseURL |
string |
ABSURD_DATABASE_URL, then PGDATABASE, then postgresql://localhost/absurd |
Connection string used when DB is not provided |
QueueName |
string |
"default" |
Default queue for spawning and workers |
DefaultMaxAttempts |
int |
5 |
Default retry limit |
Logger |
Logger |
stdlib logger | Logger with a Printf method |
Hooks |
Hooks |
zero value | Lifecycle hooks |
Swapping PostgreSQL drivers (pgx, pq, others)
The SDK works with any Postgres driver that integrates with database/sql.
Option A: provide your own *sql.DB (recommended for apps with existing DB wiring)
import (
"database/sql"
_ "github.com/jackc/pgx/v5/stdlib"
)
db, err := sql.Open("pgx", dsn)
if err != nil {
return err
}
app, err := absurd.New(absurd.Options{
DB: db,
QueueName: "default",
})
Swap to lib/pq by changing only driver import + name:
import _ "github.com/lib/pq"
db, err := sql.Open("postgres", dsn)
Option B: let Absurd open the connection
import _ "github.com/jackc/pgx/v5/stdlib"
app, err := absurd.New(absurd.Options{
DriverName: "pgx",
DatabaseURL: dsn,
QueueName: "default",
})
For lib/pq, set DriverName: "postgres" and import _ "github.com/lib/pq".
Registering Tasks
Define a typed task with absurd.Task(...), then register it with
app.Register(...) or app.MustRegister(...).
package main
import (
"context"
"log"
"github.com/earendil-works/absurd/sdks/go/absurd"
_ "github.com/jackc/pgx/v5/stdlib"
)
type SendEmailParams struct {
To string `json:"to"`
Template string `json:"template"`
}
type SendEmailResult struct {
Accepted []string `json:"accepted"`
HTML string `json:"html"`
}
var sendEmailTask = absurd.Task(
"send-email",
func(
ctx context.Context,
params SendEmailParams,
) (SendEmailResult, error) {
rendered, err := absurd.Step(
ctx,
"render",
func(ctx context.Context) (string, error) {
return "<h1>" + params.Template + "</h1>", nil
},
)
if err != nil {
return SendEmailResult{}, err
}
return absurd.Step(
ctx,
"send",
func(ctx context.Context) (SendEmailResult, error) {
return SendEmailResult{
Accepted: []string{params.To},
HTML: rendered,
}, nil
},
)
},
)
func main() {
app, err := absurd.New(absurd.Options{
QueueName: "default",
DriverName: "pgx",
})
if err != nil {
log.Fatal(err)
}
defer app.Close()
app.MustRegister(sendEmailTask)
}
TaskOptions
Pass them as the optional third argument to absurd.Task(...):
absurd.Task(
"send-email",
handler,
absurd.TaskOptions{
QueueName: "emails",
DefaultMaxAttempts: 10,
DefaultCancellation: &absurd.CancellationPolicy{
MaxDuration: 3600,
},
},
)
| Option | Type | Default | Description |
|---|---|---|---|
QueueName |
string |
Client queue | Queue this task belongs to |
DefaultMaxAttempts |
int |
Client default | Default max attempts |
DefaultCancellation |
*CancellationPolicy |
nil |
Default cancellation policy |
Spawning Tasks
ctx := context.Background()
result, err := app.Spawn(
ctx,
"send-email",
SendEmailParams{To: "user@example.com", Template: "welcome"},
absurd.SpawnOptions{
MaxAttempts: 10,
RetryStrategy: &absurd.RetryStrategy{
Kind: "exponential",
BaseSeconds: 2,
Factor: 2,
MaxSeconds: 300,
},
Headers: map[string]any{"trace_id": "..."},
IdempotencyKey: "welcome:user-42",
},
)
if err != nil {
log.Fatal(err)
}
log.Printf(
"task=%s run=%s created=%v",
result.TaskID,
result.RunID,
result.Created,
)
If you already have a typed task definition, you can also spawn through it:
result, err := sendEmailTask.Spawn(
ctx,
app,
SendEmailParams{To: "user@example.com", Template: "welcome"},
)
SpawnOptions
| Option | Type | Description |
|---|---|---|
QueueName |
string |
Target queue for unregistered tasks |
MaxAttempts |
int |
Max retry attempts |
RetryStrategy |
*RetryStrategy |
Backoff configuration |
Headers |
map[string]any |
Metadata attached to the task |
Cancellation |
*CancellationPolicy |
Auto-cancellation policy |
IdempotencyKey |
string |
Dedup key |
SpawnResult
| Field | Type | Description |
|---|---|---|
TaskID |
string |
Unique task identifier |
RunID |
string |
Current run identifier |
Attempt |
int |
Attempt number |
Created |
bool |
false if an existing task was returned |
Task Results
Queue-targeted control APIs in the Go SDK take the queue name explicitly.
Usually that means passing app.QueueName().
app.FetchTaskResult(ctx, queueName, taskID)
Returns a *TaskResultSnapshot, or nil if the task does not exist.
snapshot, err := app.FetchTaskResult(ctx, app.QueueName(), taskID)
if err != nil {
log.Fatal(err)
}
if snapshot != nil {
log.Println(snapshot.State)
}
app.AwaitTaskResult(ctx, queueName, taskID, options...)
Polls until the task reaches a terminal state (completed, failed,
cancelled). Returns a TimeoutError if the timeout is reached.
final, err := app.AwaitTaskResult(
ctx,
app.QueueName(),
taskID,
absurd.AwaitTaskResultOptions{Timeout: 30 * time.Second},
)
if err != nil {
log.Fatal(err)
}
If the task does not exist, the returned error wraps absurd.ErrTaskNotFound.
TaskResultSnapshot
| Field | Type | Description |
|---|---|---|
State |
TaskResultState |
Current task state |
Result |
json.RawMessage |
Final result payload |
Failure |
json.RawMessage |
Failure payload |
Helper methods:
snapshot.DecodeResult(&dst)snapshot.DecodeFailure(&dst)snapshot.IsTerminal()
Task Context and Durable Operations
Inside a task handler, the active task metadata lives on the context.Context.
Use absurd.TaskFromContext() or absurd.MustTaskContext() when you need task
IDs, headers, or cross-queue task waits.
task := absurd.MustTaskContext(ctx)
log.Println(task.TaskID(), task.RunID(), task.Attempt(), task.Headers())
absurd.Step(ctx, name, fn)
Run an idempotent step. The result is checkpointed in Postgres.
result, err := absurd.Step(
ctx,
"fetch-data",
func(ctx context.Context) (map[string]any, error) {
return map[string]any{"ok": true, "source": "demo"}, nil
},
)
absurd.BeginStep(ctx, name) + handle.CompleteStep(ctx, value)
Advanced form of Step() when you need to split checkpoint handling across
multiple calls.
handle, err := absurd.BeginStep[map[string]any](ctx, "agent-turn")
if err != nil {
return err
}
if handle.Done {
log.Println(handle.State)
} else {
_, err = handle.CompleteStep(ctx, map[string]any{"message": "hello"})
}
handle.CheckpointName contains the concrete checkpoint key after automatic
step numbering (name, name#2, ...).
absurd.SleepFor(ctx, stepName, d) / absurd.SleepUntil(ctx, stepName, wakeAt)
Suspend the task for a duration or until an absolute time.
if err := absurd.SleepFor(ctx, "cooldown", time.Hour); err != nil {
return err
}
if err := absurd.SleepUntil(
ctx,
"deadline",
time.Date(2025, 12, 31, 0, 0, 0, 0, time.UTC),
); err != nil {
return err
}
absurd.AwaitEvent(ctx, eventName, options...)
Suspend until a named event is emitted. Returns the event payload.
type ShipmentEvent struct {
TrackingNumber string `json:"tracking_number"`
}
shipment, err := absurd.AwaitEvent[ShipmentEvent](
ctx,
"order.shipped",
absurd.AwaitEventOptions{Timeout: 24 * time.Hour},
)
if err != nil {
return err
}
log.Println(shipment.TrackingNumber)
task.AwaitTaskResult(ctx, queueName, taskID, options...)
Durably wait for another task's terminal result from inside a task handler. This must target a different queue than the current task's queue.
child, err := app.Spawn(
ctx,
"child-task",
map[string]any{},
absurd.SpawnOptions{QueueName: "child-workers"},
)
if err != nil {
return err
}
task := absurd.MustTaskContext(ctx)
childResult, err := task.AwaitTaskResult(
ctx,
"child-workers",
child.TaskID,
absurd.AwaitTaskResultOptions{Timeout: time.Minute},
)
if err != nil {
return err
}
log.Println(childResult.State)
absurd.Heartbeat(ctx, d)
Extend the current run lease.
if err := absurd.Heartbeat(ctx, 5*time.Minute); err != nil {
return err
}
absurd.EmitEvent(ctx, eventName, payload)
Emit an event on the current task's queue.
if err := absurd.EmitEvent(
ctx,
"order.completed",
map[string]any{"order_id": "42"},
); err != nil {
return err
}
Events
Emit events from outside a task handler:
err := app.EmitEvent(
context.Background(),
app.QueueName(),
"order.shipped",
map[string]any{"tracking_number": "XYZ"},
)
Cancellation
err := app.CancelTask(ctx, app.QueueName(), taskID)
Running tasks notice cancellation at the next durable operation such as a checkpoint write, event wait, or heartbeat.
Retrying Failed Tasks
result, err := app.RetryTask(
ctx,
app.QueueName(),
taskID,
absurd.RetryTaskOptions{
MaxAttempts: 5,
SpawnNew: true,
},
)
Queue Management
if err := app.CreateQueue(ctx, "emails"); err != nil {
log.Fatal(err)
}
if err := app.DropQueue(ctx, "emails"); err != nil {
log.Fatal(err)
}
queues, err := app.ListQueues(ctx)
Running Workers
Continuous worker
workerCtx, cancel := context.WithCancel(context.Background())
defer cancel()
if err := app.RunWorker(workerCtx, absurd.WorkerOptions{
WorkerID: "web-1",
ClaimTimeout: 2 * time.Minute,
Concurrency: 4,
BatchSize: 4,
PollInterval: 250 * time.Millisecond,
OnError: func(err error) {
log.Printf("worker error: %v", err)
},
}); err != nil && !errors.Is(err, context.Canceled) {
log.Fatal(err)
}
Graceful shutdown is controlled by canceling the context you passed to
RunWorker().
Single-batch processing
err := app.WorkBatch(ctx, absurd.WorkBatchOptions{
WorkerID: "cron-1",
ClaimTimeout: 2 * time.Minute,
BatchSize: 10,
})
Hooks
BeforeSpawn
Called before every Spawn(). Use it to inject headers or modify options.
app, err := absurd.New(absurd.Options{
Hooks: absurd.Hooks{
BeforeSpawn: func(
taskName string,
params any,
options absurd.SpawnOptions,
) (absurd.SpawnOptions, error) {
if options.Headers == nil {
options.Headers = map[string]any{}
}
options.Headers["trace_id"] = "trace-123"
return options, nil
},
},
})
WrapTaskExecution
Wrap task execution for tracing, logging, or context propagation.
app, err := absurd.New(absurd.Options{
Hooks: absurd.Hooks{
WrapTaskExecution: func(
task *absurd.TaskContext,
execute func() (any, error),
) (any, error) {
traceID := task.Headers()["trace_id"]
log.Printf("starting task %s trace=%v", task.TaskID(), traceID)
return execute()
},
},
})
Error Types
| Error | Description |
|---|---|
absurd.ErrNoTaskContext |
No active task context in the provided context.Context |
absurd.ErrTaskNotFound |
Task result lookup could not find the task |
*absurd.TimeoutError |
Returned by AwaitEvent() / AwaitTaskResult() on timeout |
TimeoutError also implements Timeout() bool.
Closing
defer app.Close()
If the client opened its own database handle, Close() also closes it.
Retry Strategies
| Field | Type | Description |
|---|---|---|
Kind |
string |
"fixed", "exponential", or "none" |
BaseSeconds |
float64 |
Base delay |
Factor |
float64 |
Exponential multiplier |
MaxSeconds |
float64 |
Maximum retry delay |
&absurd.RetryStrategy{
Kind: "exponential",
BaseSeconds: 1,
Factor: 2,
MaxSeconds: 300,
}
Cancellation Policies
| Field | Type | Description |
|---|---|---|
MaxDuration |
int64 |
Cancel task after N seconds total lifetime |
MaxDelay |
int64 |
Cancel task if no checkpoint is written for N seconds |