Skip to content

Quickstart

This guide walks through a small workflow that shows why Absurd is useful. We model a basic signup flow here:

  1. create a user record
  2. try to send an activation email
  3. survive a transient failure with a retry
  4. wait for an external activation event
  5. return a final result you can inspect or await

With a plain queue, you would usually end up inventing your own retry logic, status table, wake-up mechanism, and polling API. In Absurd, you write that as one task.

The exact examples in this guide live in the repository:

Prerequisites

  • PostgreSQL (14 or later)
  • pg_cron extension (optional, only needed if you want Absurd-managed cron automation for partition provisioning/cleanup/detach)
  • Node.js with native TypeScript type stripping for the TypeScript SDK
  • Python (3.11+) with uv for the Python SDK
  • Go (1.25+) for the Go SDK
  • absurdctl — see absurdctl for installation options

All examples below use absurdctl directly. If you are using uvx, replace absurdctl ... with uvx absurdctl ....

1. Install the Schema

Absurd ships as a single SQL file. Apply it to any Postgres database:

export PGDATABASE="postgresql://user:pass@localhost:5432/mydb"
absurdctl init

See absurdctl for installation details and the full CLI reference.

This creates the absurd schema with all stored procedures and helper functions. If you prefer, you can also apply sql/absurd.sql directly with psql or plug it into your migration system.

For production deployments, it is usually better to keep Absurd schema changes inside your existing database migration flow. See Database Setup and Migrations for the recommended workflow, including how to generate upgrade SQL with absurdctl migrate --dump-sql.

2. Create a Queue

Queues are logical groups of tasks. Each queue gets its own set of tables (t_, r_, c_, e_, w_ suffixed by queue name).

absurdctl create-queue default

3. Write the Worker

The worker registers a provision-user task.

The important part is the failure story:

  • create-user-record is a normal checkpointed step
  • demo-transient-outage fails once on purpose so you can see retries
  • on retry, the task replays from checkpoints instead of re-running completed work
  • the task then waits for user-activated:<user_id> and returns a final result
import { Absurd } from "absurd-sdk";

type ProvisionUserParams = {
  user_id: string;
  email: string;
};

type ActivationEvent = {
  activated_at: string;
};

const app = new Absurd({ queueName: "default" });

app.registerTask<ProvisionUserParams>(
  {
    name: "provision-user",
    defaultMaxAttempts: 5,
  },
  async (params, ctx) => {
    const user = await ctx.step("create-user-record", async () => {
      console.log(
        `[${ctx.taskID}] creating user record for ${params.user_id}`,
      );
      return {
        user_id: params.user_id,
        email: params.email,
        created_at: new Date().toISOString(),
      };
    });

    // Demo only: fail once after the first checkpoint so retry behavior
    // is visible.
    const outage = await ctx.beginStep<{ simulated: boolean }>(
      "demo-transient-outage",
    );
    if (!outage.done) {
      console.log(
        `[${ctx.taskID}] simulating a temporary email provider outage`,
      );
      await ctx.completeStep(outage, { simulated: true });
      throw new Error("temporary email provider outage");
    }

    const delivery = await ctx.step("send-activation-email", async () => {
      console.log(
        `[${ctx.taskID}] sending activation email to ${user.email}`,
      );
      return {
        sent: true,
        provider: "demo-mail",
        to: user.email,
      };
    });

    console.log(
      `[${ctx.taskID}] waiting for user-activated:${user.user_id}`,
    );
    const activation = (await ctx.awaitEvent(
      `user-activated:${user.user_id}`,
      {
        timeout: 3600,
      },
    )) as ActivationEvent;

    return {
      user_id: user.user_id,
      email: user.email,
      delivery,
      status: "active",
      activated_at: activation.activated_at,
    };
  },
);

console.log("worker listening on queue default");
await app.startWorker({ concurrency: 4 });
from datetime import datetime, timezone

from absurd_sdk import Absurd

app = Absurd(queue_name="default")


@app.register_task("provision-user", default_max_attempts=5)
def provision_user(params, ctx):
    def create_user_record():
        print(
            f"[{ctx.task_id}] creating user record for {params['user_id']}"
        )
        return {
            "user_id": params["user_id"],
            "email": params["email"],
            "created_at": datetime.now(timezone.utc).isoformat(),
        }

    user = ctx.step("create-user-record", create_user_record)

    # Demo only: fail once after the first checkpoint so the retry
    # behavior is visible.
    outage = ctx.begin_step("demo-transient-outage")
    if not outage.done:
        print(
            f"[{ctx.task_id}] simulating a temporary email provider outage"
        )
        ctx.complete_step(outage, {"simulated": True})
        raise RuntimeError("temporary email provider outage")

    def send_activation_email():
        print(
            f"[{ctx.task_id}] sending activation email to {user['email']}"
        )
        return {
            "sent": True,
            "provider": "demo-mail",
            "to": user["email"],
        }

    delivery = ctx.step("send-activation-email", send_activation_email)

    print(f"[{ctx.task_id}] waiting for user-activated:{user['user_id']}")
    activation = ctx.await_event(
        f"user-activated:{user['user_id']}",
        timeout=3600,
    )

    return {
        "user_id": user["user_id"],
        "email": user["email"],
        "delivery": delivery,
        "status": "active",
        "activated_at": activation["activated_at"],
    }


print("worker listening on queue default")
app.start_worker()
package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "time"

    "github.com/earendil-works/absurd/sdks/go/absurd"
    _ "github.com/jackc/pgx/v5/stdlib"
)

type ProvisionUserParams struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

type UserRecord struct {
    UserID    string    `json:"user_id"`
    Email     string    `json:"email"`
    CreatedAt time.Time `json:"created_at"`
}

type OutageState struct {
    Simulated bool `json:"simulated"`
}

type DeliveryResult struct {
    Sent     bool   `json:"sent"`
    Provider string `json:"provider"`
    To       string `json:"to"`
}

type ActivationEvent struct {
    ActivatedAt time.Time `json:"activated_at"`
}

type ProvisionUserResult struct {
    UserID      string         `json:"user_id"`
    Email       string         `json:"email"`
    Delivery    DeliveryResult `json:"delivery"`
    Status      string         `json:"status"`
    ActivatedAt time.Time      `json:"activated_at"`
}

var provisionUserTask = absurd.Task(
    "provision-user",
    func(
        ctx context.Context,
        params ProvisionUserParams,
    ) (ProvisionUserResult, error) {
        task := absurd.MustTaskContext(ctx)

        user, err := absurd.Step(
            ctx,
            "create-user-record",
            func(ctx context.Context) (UserRecord, error) {
                log.Printf(
                    "[%s] creating user record for %s",
                    task.TaskID(),
                    params.UserID,
                )
                return UserRecord{
                    UserID:    params.UserID,
                    Email:     params.Email,
                    CreatedAt: time.Now().UTC(),
                }, nil
            },
        )
        if err != nil {
            return ProvisionUserResult{}, err
        }

        // Demo only: fail once after the first checkpoint so the retry
        // behavior is visible.
        outage, err := absurd.BeginStep[OutageState](
            ctx,
            "demo-transient-outage",
        )
        if err != nil {
            return ProvisionUserResult{}, err
        }
        if !outage.Done {
            log.Printf(
                "[%s] simulating a temporary email provider outage",
                task.TaskID(),
            )
            if _, err := outage.CompleteStep(
                ctx,
                OutageState{Simulated: true},
            ); err != nil {
                return ProvisionUserResult{}, err
            }
            return ProvisionUserResult{}, errors.New(
                "temporary email provider outage",
            )
        }

        delivery, err := absurd.Step(
            ctx,
            "send-activation-email",
            func(ctx context.Context) (DeliveryResult, error) {
                log.Printf(
                    "[%s] sending activation email to %s",
                    task.TaskID(),
                    user.Email,
                )
                return DeliveryResult{
                    Sent:     true,
                    Provider: "demo-mail",
                    To:       user.Email,
                }, nil
            },
        )
        if err != nil {
            return ProvisionUserResult{}, err
        }

        eventName := fmt.Sprintf("user-activated:%s", user.UserID)
        log.Printf("[%s] waiting for %s", task.TaskID(), eventName)

        activation, err := absurd.AwaitEvent[ActivationEvent](
            ctx,
            eventName,
            absurd.AwaitEventOptions{Timeout: time.Hour},
        )
        if err != nil {
            return ProvisionUserResult{}, err
        }

        return ProvisionUserResult{
            UserID:      user.UserID,
            Email:       user.Email,
            Delivery:    delivery,
            Status:      "active",
            ActivatedAt: activation.ActivatedAt,
        }, nil
    },
    absurd.TaskOptions{DefaultMaxAttempts: 5},
)

func main() {
    app, err := absurd.New(absurd.Options{
        QueueName:  "default",
        DriverName: "pgx",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer app.Close()

    app.MustRegister(provisionUserTask)

    log.Println("worker listening on queue default")
    if err := app.RunWorker(
        context.Background(),
        absurd.WorkerOptions{Concurrency: 4},
    ); err != nil {
        log.Fatal(err)
    }
}

Run one of the repository examples in a terminal:

cd sdks/typescript
npm install
node examples/quickstart/worker.ts
cd sdks/python
uv run examples/quickstart/worker.py
cd sdks/go/absurd
go run ./examples/quickstart/worker

4. Spawn a Task

In another terminal, spawn a task. The spawn result gives you a task ID you can store, inspect later, or await.

import { Absurd } from "absurd-sdk";

const shouldAwait = process.argv.includes("--await");
const args = process.argv.slice(2).filter((arg) => arg !== "--await");
const userID = args[0] ?? "alice";
const email = args[1] ?? `${userID}@example.com`;

const queueName = "default";
const app = new Absurd({ queueName });

const spawned = await app.spawn(
  "provision-user",
  {
    user_id: userID,
    email,
  },
  { queue: queueName },
);

console.log("spawned:", spawned);
console.log(
  "current snapshot:",
  await app.fetchTaskResult(spawned.taskID),
);

if (shouldAwait) {
  console.log(
    `waiting for completion; emit user-activated:${userID} ` +
      "on queue default",
  );
  console.log(
    "final snapshot:",
    await app.awaitTaskResult(spawned.taskID, { timeout: 300 }),
  );
}

await app.close();
import sys

from absurd_sdk import Absurd

should_await = "--await" in sys.argv
args = [arg for arg in sys.argv[1:] if arg != "--await"]
user_id = args[0] if len(args) > 0 else "alice"
email = args[1] if len(args) > 1 else f"{user_id}@example.com"

queue_name = "default"
app = Absurd(queue_name=queue_name)

spawned = app.spawn(
    "provision-user",
    {
        "user_id": user_id,
        "email": email,
    },
    queue=queue_name,
)

print("spawned:", spawned)
print("current snapshot:", app.fetch_task_result(spawned["task_id"]))

if should_await:
    print(
        f"waiting for completion; emit user-activated:{user_id} "
        "on queue default"
    )
    print(
        "final snapshot:",
        app.await_task_result(spawned["task_id"], timeout=300),
    )

app.close()
package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "time"

    "github.com/earendil-works/absurd/sdks/go/absurd"
    _ "github.com/jackc/pgx/v5/stdlib"
)

type ProvisionUserParams struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func main() {
    var shouldAwait bool
    flag.BoolVar(&shouldAwait, "await", false, "wait for task completion")
    flag.Parse()

    userID := "alice"
    if flag.NArg() > 0 {
        userID = flag.Arg(0)
    }

    email := fmt.Sprintf("%s@example.com", userID)
    if flag.NArg() > 1 {
        email = flag.Arg(1)
    }

    ctx := context.Background()

    app, err := absurd.New(absurd.Options{
        QueueName:  "default",
        DriverName: "pgx",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer app.Close()

    spawned, err := app.Spawn(
        ctx,
        "provision-user",
        ProvisionUserParams{
            UserID: userID,
            Email:  email,
        },
        absurd.SpawnOptions{QueueName: app.QueueName()},
    )
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("spawned: %+v\n", spawned)

    snapshot, err := app.FetchTaskResult(
        ctx,
        app.QueueName(),
        spawned.TaskID,
    )
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("current snapshot: %+v\n", snapshot)

    if shouldAwait {
        fmt.Printf(
            "waiting for completion; emit user-activated:%s "+
                "on queue default\n",
            userID,
        )
        final, err := app.AwaitTaskResult(
            ctx,
            app.QueueName(),
            spawned.TaskID,
            absurd.AwaitTaskResultOptions{Timeout: 5 * time.Minute},
        )
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("final snapshot: %+v\n", final)
    }
}

!!! warning "Spawning from a separate process"

The client snippets above do **not** register `provision-user` locally.
For safety, SDK `spawn()` requires an explicit queue for unregistered
tasks.

Also note: when a task is unregistered in the spawning process, task-level
registration defaults (for example `default_max_attempts`) are not loaded.
Retry/cancellation settings come from explicit spawn options (or SDK client
defaults).
absurdctl spawn-task --queue default provision-user \
  -P user_id=alice \
  -P email=alice@example.com

Run one of the repository clients:

cd sdks/typescript
node examples/quickstart/client.ts alice alice@example.com
cd sdks/python
uv run examples/quickstart/client.py alice alice@example.com
cd sdks/go/absurd
go run ./examples/quickstart/client alice alice@example.com

If you want to block until the task finishes, pass --await and then emit the activation event from another terminal.

5. Wake the Task Up

The task is now suspended in awaitEvent(), waiting for user-activated:alice. Wake it up like this:

absurdctl emit-event --queue default user-activated:alice \
  -P activated_at=2026-04-02T12:00:00Z

At that point the waiting task resumes, returns its final value, and moves to a terminal state.

6. Inspect or Await the Result

Most applications will just keep the returned task ID and continue. But when you want synchronous behavior in a script, test, or shell, you can also inspect or await the task result.

The Go, Python, and TypeScript clients use the same basic flow: spawn the task, fetch the current snapshot, then await the terminal result after you emit the activation event from another terminal.

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/earendil-works/absurd/sdks/go/absurd"
    _ "github.com/jackc/pgx/v5/stdlib"
)

type ProvisionUserParams struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

func main() {
    ctx := context.Background()

    app, err := absurd.New(absurd.Options{
        QueueName:  "default",
        DriverName: "pgx",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer app.Close()

    spawned, err := app.Spawn(
        ctx,
        "provision-user",
        ProvisionUserParams{
            UserID: "bob",
            Email:  "bob@example.com",
        },
        absurd.SpawnOptions{QueueName: app.QueueName()},
    )
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("%+v\n", spawned)

    snapshot, err := app.FetchTaskResult(
        ctx,
        app.QueueName(),
        spawned.TaskID,
    )
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("%+v\n", snapshot)

    final, err := app.AwaitTaskResult(
        ctx,
        app.QueueName(),
        spawned.TaskID,
        absurd.AwaitTaskResultOptions{Timeout: 5 * time.Minute},
    )
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("%+v\n", final)
}
>>> from absurd_sdk import Absurd
>>> queue_name = "default"
>>> app = Absurd(queue_name=queue_name)
>>> spawned = app.spawn(
...     "provision-user",
...     {"user_id": "bob", "email": "bob@example.com"},
...     queue=queue_name,
... )
>>> spawned
{'task_id': '019...', 'run_id': '019...', 'attempt': 1}
>>> app.fetch_task_result(spawned["task_id"])
TaskResultSnapshot(state='pending')
>>> app.await_task_result(spawned["task_id"], timeout=300)
TaskResultSnapshot(
  state='completed',
  result={
    'user_id': 'bob',
    'email': 'bob@example.com',
    'delivery': {
      'sent': True,
      'provider': 'demo-mail',
      'to': 'bob@example.com',
    },
    'status': 'active',
    'activated_at': '2026-04-02T12:00:00Z',
  },
)
import { Absurd } from "absurd-sdk";

const queueName = "default";
const app = new Absurd({ queueName });

const spawned = await app.spawn(
  "provision-user",
  {
    user_id: "bob",
    email: "bob@example.com",
  },
  { queue: queueName },
);

console.log(spawned);
console.log(await app.fetchTaskResult(spawned.taskID));
console.log(await app.awaitTaskResult(spawned.taskID, { timeout: 300 }));

await app.close();

7. What Just Happened?

This example shows the main Absurd model:

  • Retries are part of the normal flow. The demo task intentionally fails once after the first checkpoint.
  • Completed steps are not re-run. On retry, create-user-record is loaded from Postgres instead of executing again.
  • Waiting is durable. awaitEvent() suspends the task without losing state.
  • Results are queryable. You can fetch the current snapshot or await the terminal result from another process.

That is the core pitch of Absurd: write one workflow in straight-line code, while Postgres keeps the checkpoints, retries, and wake-up state.

Next Steps