Aviary

Python steps

Run workflow steps in Python. A TypeScript workflow's ctx.remote dispatches to a Python worker over BullMQ/Redis, and the result flows back — one workflow, steps split across languages.

A step doesn't have to run in Node. The durable-worker Python SDK registers step handlers by name and runs them over the same Redis queues the BullMQ transport uses — so a TypeScript workflow can ctx.remote a step implemented in Python.

pip install "durable-worker[redis]"

A Python worker

import asyncio
from durable_worker import Worker, FatalError
from durable_worker.redis_runner import run_redis_worker

worker = Worker()

@worker.step("payments.charge-card")
async def charge(data):
    res = await stripe.charge(data["orderId"], data["amountCents"])
    return {"chargeId": res.id}

async def main():
    await run_redis_worker(
        worker,
        connection="redis://localhost:6379",
    )
    await asyncio.Event().wait()

asyncio.run(main())

The handler's argument is the step input (schema-validated by the engine); its return value is the output. Raise FatalError for a non-retryable failure (e.g. a declined card); any other exception is retryable and the engine applies the step's retry policy.

Connecting to the control plane

There is no direct connection between a Python worker and your NestJS app — they never open a socket to each other, and the worker never talks to the dashboard or the database. Everything goes through the broker (Redis). The engine and the workers are decoupled producers and consumers on per-step-name queues:

  NestJS engine                              Redis (the broker)                    Python worker
  ─────────────                              ──────────────────                    ─────────────
  ctx.remote(step) ───dispatch───▶  durable-tasks-<name>[@<partition>]  ───consume──▶  @worker.step(name)
  resume ctx.remote ◀──consume────  durable-results                     ◀───add──────  return / raise

So "connecting to the control plane" just means pointing the worker at the same broker the engine's BullMQTransport uses, and registering a handler under the exact same step name. Two things have to line up:

Must matchEngine side (BullMQTransport)Worker side (run_redis_worker)Why
connectionconnection: { host, port, … }connection="redis://host:port"Same Redis instance — that's the meeting point.
prefixprefix (default durable)prefix (default durable)Namespaces the queue keys: <prefix>-tasks-<name> and <prefix>-results.

Each registered step name gets its own queue — there's no separate group to declare or match. Sharing one broker across multiple isolated deployments (e.g. a dev cluster and a developer's laptop)? Set the same partition on both sides — remoteStep({ ..., partition }) engine-side, Worker(partition=...) (or run_redis_worker(worker, partition=...)) worker-side — and every queue name is suffixed <name>@<partition> so the two deployments never cross-consume.

Match those and a ctx.remote(chargeStep, …) is delivered to whichever worker registered chargeStep.name. Nothing else to register — there's no handshake or service discovery; the queue name is the routing.

A few properties that follow from this being queue-based:

  • At-least-once delivery. A task can be redelivered (a worker crash, a reconnect), so handlers should be idempotent — key external side effects on stepId (or your own dedupe key). The engine already dedupes a step that completed, but a handler can be re-run before its result is recorded.
  • Workers are stateless and horizontal. Run as many copies as you want for a given step; Redis hands each task to one of them. Scale a slow step by adding workers, not by touching the workflow.
  • The engine owns correlation. Every result carries the stepId it answers; the engine matches it to the suspended ctx.remote and resumes the run. The worker never needs the run's state.

Authentication

The connection is a standard Redis URL, so credentials and TLS go in the URL:

await run_redis_worker(
    worker,
    connection="rediss://default:my-password@my-redis.example.com:6379",
)

Use redis:// for plaintext and rediss:// for TLS. This is the same Redis the engine connects to — if your NestJS app authenticates with a username/password, the worker uses the same ones.

run_redis_worker options

OptionDefaultDescription
workerThe Worker whose handlers run.
partitionNoneOptional isolation suffix — appends @<partition> to every one of this worker's per-name queues.
connectionredis://localhost:6379Redis URL (the broker the engine also uses).
prefixdurableQueue-key namespace; must match the engine's BullMQTransport prefix.

Other transports

The Redis worker above is one of three ways to reach the broker — pick the one matching the engine's transport. All of them run the same handlers (@worker.step(...)); only the runner that feeds them changes. SQS and the SQL transport still route by a single declared group (below); the connection model otherwise matches the engine's prefix and queue/table namespace.

AWS SQS

For an engine on the SQS transport. Install the extra: pip install "durable-worker[sqs]".

from durable_worker.sqs_runner import run_sqs_worker

run_sqs_worker(worker, group="payments", region="us-east-1")  # blocks; long-polls the tasks queue

Queue names default to <prefix>-tasks-<group> and <prefix>-results (resolved via GetQueueUrl); pass tasks_queue_url / results_queue_url to reuse queues you already own, or client= to inject a preconfigured boto3 client. SQS has no push model, so this is a blocking loop — run it in its own process and pass a threading.Event as stop to shut it down.

SQL (database)

For an engine on the broker-less SQL transport — your Python workers poll the same Postgres or MySQL tables, no broker at all. Install the driver extra: pip install "durable-worker[postgres]" or "durable-worker[mysql]".

from durable_worker.db_runner import run_db_worker

await run_db_worker(
    worker,
    group="pipeline",
    dialect="postgres",                                   # or "mysql"
    dsn="postgres://user:pass@localhost:5432/app",
)

It implements the documented table + claim contract — claiming tasks with SELECT … FOR UPDATE SKIP LOCKED, running the handler, writing a result row, deleting the task. Because both libraries target that one spec, a Node engine and a Python worker share the same two tables (the prefix must match). Requires Postgres 9.5+ or MySQL 8+ (SQLite has no SKIP LOCKED).

The wire protocol

The contract is plain JSON, identical across languages. The orchestrator dispatches a task:

{
  "runId": "wrun_8Kb2", "seq": 1, "name": "payments.charge-card",
  "stepId": "wrun_8Kb2:1", "group": "payments",
  "input": { "orderId": "o1", "amountCents": 4200 }, "attempt": 1
}

The worker replies with a result:

{ "runId": "wrun_8Kb2", "seq": 1, "stepId": "wrun_8Kb2:1",
  "status": "completed", "output": { "chargeId": "ch_1" },
  "startedAt": 1718900000123 }

startedAt is the epoch-ms moment the worker picked the task up. The engine subtracts it from when it dispatched the task to report queue-wait — how long the step sat in the queue before a worker was free — so a Python step's queue time shows up in the dashboard just like a Node one. The SDK stamps it for you.

Worker.process_task(task) -> result is the pure, transport-agnostic core (no broker needed — ideal for unit tests). Because the contract is language-neutral, a Go or Rust worker can implement the same thing — Python is just the first SDK.

Cooperative cancellation

When a run is cancelled — engine.cancel(runId) on the NestJS side — a step that's already running in a worker can't be force-killed from outside the process. Instead the orchestrator broadcasts a control message and the handler bails out on its own at a safe point. This works cross-language: the control plane is the same one the TypeScript engine uses, so a Python worker observes engine.cancel just like a Node one.

The mechanics:

  • The orchestrator publishes {"kind": "cancel", "runId": ...} on its control channel (<prefix>-control, the same channel the TS control plane uses).
  • run_redis_worker subscribes to that channel and feeds a CancellationRegistry — a thread-safe set of cancelled run ids. It does this automatically; one is created for you if you don't pass your own.
  • The registry is surfaced to your handler through the step context as ctx.cancelled (a bool) and ctx.raise_if_cancelled() (raises Cancelled). A long handler checks it at safe points and returns or raises, so it stops doing work whose result is going to be discarded.

Opt in by declaring the second ctx parameter on your handler (same context that carries the step events), then poll it inside the loop:

from durable_worker import Worker, Cancelled

worker = Worker()

@worker.step("pipeline.transcode")
def transcode(data, ctx):
    out = []
    for chunk in data["chunks"]:
        # Bail out before starting more expensive work — the run was cancelled,
        # so finishing the remaining chunks would just be thrown away.
        if ctx.cancelled:
            ctx.warn("cancelled — stopping early", {"done": len(out)})
            return {"partial": out, "cancelled": True}

        out.append(encode_chunk(chunk))
    return {"chunks": out}

ctx.raise_if_cancelled() is the one-liner equivalent when you'd rather abort than return a partial result — it raises Cancelled (a subclass of Exception), which surfaces as a failed step:

@worker.step("pipeline.crawl")
async def crawl(data, ctx):
    results = []
    for url in data["urls"]:
        ctx.raise_if_cancelled()   # raises Cancelled the moment the run is cancelled
        results.append(await fetch(url))
    return {"results": results}

run_redis_worker passes registry.is_cancelled into process_task for you, so ctx.cancelled reflects cancellation automatically. If you want to drive a registry yourself (e.g. you run the worker on a transport without pub/sub, or you want to share one registry across runners), construct it and pass it in:

from durable_worker.cancellation import CancellationRegistry

cancellation = CancellationRegistry()

await run_redis_worker(
    worker,
    connection="redis://localhost:6379",
    cancellation=cancellation,
)

Cancellation is cooperative by design: a handler that never checks ctx.cancelled runs to completion, the engine just discards its result. Sprinkle the check at the natural boundaries of a long handler (between chunks, before an expensive call) so it can let go promptly.

The control-channel subscription is best-effort — it needs the redis async client for pub/sub. If it isn't available the runner logs and continues; steps still run, cancellation just won't be observed.

Multi-broker reply routing

A worker can consume more than one broker at once — typically for failover, where the engine dispatches the same step over two transports and whichever broker is healthy delivers it. The catch: the result has to go back on the same broker the task arrived on, otherwise the engine waiting on the other transport never sees it. The orchestrator stamps each task with the id of the transport it was dispatched on (task["transport"]), and reply_target does the lookup:

from durable_worker.routing import reply_target

# One reply target per broker id (e.g. a results queue / publisher per transport).
targets = {
    "redis-primary": primary_results,
    "redis-failover": failover_results,
}

def on_task(task):
    result = worker.process_task(task, is_cancelled=registry.is_cancelled)
    target = reply_target(task, targets)  # picks the broker the task came in on
    target.send(result)                   # reply where you were told, never pick one yourself

reply_target(task, targets) returns the entry keyed by task["transport"]. It falls back to the lone target when the id is absent (a single transport) or unknown, and returns None only when targets is empty — so the single-broker case keeps working with a one-entry mapping and no special casing.

That said, the simpler pattern is one runner per broker: start a run_redis_worker (or SQS / DB runner) for each broker, each consuming and replying on its own transport. Each runner already publishes its results on the broker it consumes, so there's nothing to route — no reply_target needed. Reach for reply_target only when a single worker loop genuinely multiplexes several brokers and has to decide, per task, where the reply goes.

On this page