Python steps
Run workflow steps in Python. A TypeScript workflow's ctx.remote dispatches to a Python worker over BullMQ/Redis, and the result flows back — one workflow, steps split across languages.
A step doesn't have to run in Node. The durable-worker Python SDK registers step handlers by
name and runs them over the same Redis queues the BullMQ transport uses
— so a TypeScript workflow can ctx.remote a step implemented in Python.
pip install "durable-worker[redis]"A Python worker
import asyncio
from durable_worker import Worker, FatalError
from durable_worker.redis_runner import run_redis_worker
worker = Worker()
@worker.step("payments.charge-card")
async def charge(data):
res = await stripe.charge(data["orderId"], data["amountCents"])
return {"chargeId": res.id}
async def main():
await run_redis_worker(
worker,
connection="redis://localhost:6379",
)
await asyncio.Event().wait()
asyncio.run(main())The handler's argument is the step input (schema-validated by the engine); its return value is
the output. Raise FatalError for a non-retryable failure (e.g. a declined card); any other
exception is retryable and the engine applies the step's retry policy.
Connecting to the control plane
There is no direct connection between a Python worker and your NestJS app — they never open a socket to each other, and the worker never talks to the dashboard or the database. Everything goes through the broker (Redis). The engine and the workers are decoupled producers and consumers on per-step-name queues:
NestJS engine Redis (the broker) Python worker
───────────── ────────────────── ─────────────
ctx.remote(step) ───dispatch───▶ durable-tasks-<name>[@<partition>] ───consume──▶ @worker.step(name)
resume ctx.remote ◀──consume──── durable-results ◀───add────── return / raiseSo "connecting to the control plane" just means pointing the worker at the same broker the
engine's BullMQTransport uses, and registering a handler under the exact
same step name. Two things have to line up:
| Must match | Engine side (BullMQTransport) | Worker side (run_redis_worker) | Why |
|---|---|---|---|
| connection | connection: { host, port, … } | connection="redis://host:port" | Same Redis instance — that's the meeting point. |
| prefix | prefix (default durable) | prefix (default durable) | Namespaces the queue keys: <prefix>-tasks-<name> and <prefix>-results. |
Each registered step name gets its own queue — there's no separate group to declare or match.
Sharing one broker across multiple isolated deployments (e.g. a dev cluster and a developer's
laptop)? Set the same partition on both sides — remoteStep({ ..., partition }) engine-side,
Worker(partition=...) (or run_redis_worker(worker, partition=...)) worker-side — and every queue
name is suffixed <name>@<partition> so the two deployments never cross-consume.
Match those and a ctx.remote(chargeStep, …) is delivered to whichever worker registered
chargeStep.name. Nothing else to register — there's no handshake or service discovery; the queue
name is the routing.
A few properties that follow from this being queue-based:
- At-least-once delivery. A task can be redelivered (a worker crash, a reconnect), so handlers
should be idempotent — key external side effects on
stepId(or your own dedupe key). The engine already dedupes a step that completed, but a handler can be re-run before its result is recorded. - Workers are stateless and horizontal. Run as many copies as you want for a given step; Redis hands each task to one of them. Scale a slow step by adding workers, not by touching the workflow.
- The engine owns correlation. Every result carries the
stepIdit answers; the engine matches it to the suspendedctx.remoteand resumes the run. The worker never needs the run's state.
Authentication
The connection is a standard Redis URL, so credentials and TLS go in the URL:
await run_redis_worker(
worker,
connection="rediss://default:my-password@my-redis.example.com:6379",
)Use redis:// for plaintext and rediss:// for TLS. This is the same Redis the engine connects to
— if your NestJS app authenticates with a username/password, the worker uses the same ones.
run_redis_worker options
| Option | Default | Description |
|---|---|---|
worker | — | The Worker whose handlers run. |
partition | None | Optional isolation suffix — appends @<partition> to every one of this worker's per-name queues. |
connection | redis://localhost:6379 | Redis URL (the broker the engine also uses). |
prefix | durable | Queue-key namespace; must match the engine's BullMQTransport prefix. |
Other transports
The Redis worker above is one of three ways to reach the broker — pick the one matching the engine's
transport. All of them run the same handlers (@worker.step(...)); only the
runner that feeds them changes. SQS and the SQL transport still route by a single declared group
(below); the connection model otherwise matches the engine's prefix and queue/table namespace.
AWS SQS
For an engine on the SQS transport. Install the extra:
pip install "durable-worker[sqs]".
from durable_worker.sqs_runner import run_sqs_worker
run_sqs_worker(worker, group="payments", region="us-east-1") # blocks; long-polls the tasks queueQueue names default to <prefix>-tasks-<group> and <prefix>-results (resolved via GetQueueUrl);
pass tasks_queue_url / results_queue_url to reuse queues you already own, or client= to inject a
preconfigured boto3 client. SQS has no push model, so this is a blocking loop — run it in its own
process and pass a threading.Event as stop to shut it down.
SQL (database)
For an engine on the broker-less SQL transport — your Python workers poll the
same Postgres or MySQL tables, no broker at all. Install the driver extra:
pip install "durable-worker[postgres]" or "durable-worker[mysql]".
from durable_worker.db_runner import run_db_worker
await run_db_worker(
worker,
group="pipeline",
dialect="postgres", # or "mysql"
dsn="postgres://user:pass@localhost:5432/app",
)It implements the documented table + claim contract — claiming
tasks with SELECT … FOR UPDATE SKIP LOCKED, running the handler, writing a result row, deleting the
task. Because both libraries target that one spec, a Node engine and a Python worker share the same
two tables (the prefix must match). Requires Postgres 9.5+ or MySQL 8+ (SQLite has no SKIP LOCKED).
The wire protocol
The contract is plain JSON, identical across languages. The orchestrator dispatches a task:
{
"runId": "wrun_8Kb2", "seq": 1, "name": "payments.charge-card",
"stepId": "wrun_8Kb2:1", "group": "payments",
"input": { "orderId": "o1", "amountCents": 4200 }, "attempt": 1
}The worker replies with a result:
{ "runId": "wrun_8Kb2", "seq": 1, "stepId": "wrun_8Kb2:1",
"status": "completed", "output": { "chargeId": "ch_1" },
"startedAt": 1718900000123 }startedAt is the epoch-ms moment the worker picked the task up. The engine subtracts it from when
it dispatched the task to report queue-wait — how long the step
sat in the queue before a worker was free — so a Python step's queue time shows up in the dashboard
just like a Node one. The SDK stamps it for you.
Worker.process_task(task) -> result is the pure, transport-agnostic core (no broker needed — ideal
for unit tests). Because the contract is language-neutral, a Go or Rust worker can implement the same
thing — Python is just the first SDK.
Cooperative cancellation
When a run is cancelled — engine.cancel(runId) on the NestJS side — a step that's already running
in a worker can't be force-killed from outside the process. Instead the orchestrator broadcasts a
control message and the handler bails out on its own at a safe point. This works cross-language: the
control plane is the same one the TypeScript engine uses, so a Python worker observes engine.cancel
just like a Node one.
The mechanics:
- The orchestrator publishes
{"kind": "cancel", "runId": ...}on its control channel (<prefix>-control, the same channel the TS control plane uses). run_redis_workersubscribes to that channel and feeds aCancellationRegistry— a thread-safe set of cancelled run ids. It does this automatically; one is created for you if you don't pass your own.- The registry is surfaced to your handler through the step context as
ctx.cancelled(a bool) andctx.raise_if_cancelled()(raisesCancelled). A long handler checks it at safe points and returns or raises, so it stops doing work whose result is going to be discarded.
Opt in by declaring the second ctx parameter on your handler (same context that carries the
step events), then poll it inside the loop:
from durable_worker import Worker, Cancelled
worker = Worker()
@worker.step("pipeline.transcode")
def transcode(data, ctx):
out = []
for chunk in data["chunks"]:
# Bail out before starting more expensive work — the run was cancelled,
# so finishing the remaining chunks would just be thrown away.
if ctx.cancelled:
ctx.warn("cancelled — stopping early", {"done": len(out)})
return {"partial": out, "cancelled": True}
out.append(encode_chunk(chunk))
return {"chunks": out}ctx.raise_if_cancelled() is the one-liner equivalent when you'd rather abort than return a partial
result — it raises Cancelled (a subclass of Exception), which surfaces as a failed step:
@worker.step("pipeline.crawl")
async def crawl(data, ctx):
results = []
for url in data["urls"]:
ctx.raise_if_cancelled() # raises Cancelled the moment the run is cancelled
results.append(await fetch(url))
return {"results": results}run_redis_worker passes registry.is_cancelled into process_task for you, so ctx.cancelled
reflects cancellation automatically. If you want to drive a registry yourself (e.g. you run the worker
on a transport without pub/sub, or you want to share one registry across runners), construct it and
pass it in:
from durable_worker.cancellation import CancellationRegistry
cancellation = CancellationRegistry()
await run_redis_worker(
worker,
connection="redis://localhost:6379",
cancellation=cancellation,
)Cancellation is cooperative by design: a handler that never checks
ctx.cancelledruns to completion, the engine just discards its result. Sprinkle the check at the natural boundaries of a long handler (between chunks, before an expensive call) so it can let go promptly.
The control-channel subscription is best-effort — it needs the redis async client for pub/sub.
If it isn't available the runner logs and continues; steps still run, cancellation just won't be
observed.
Multi-broker reply routing
A worker can consume more than one broker at once — typically for failover, where the engine
dispatches the same step over two transports and whichever broker is healthy delivers it. The catch:
the result has to go back on the same broker the task arrived on, otherwise the engine waiting on
the other transport never sees it. The orchestrator stamps each task with the id of the transport it
was dispatched on (task["transport"]), and reply_target does the lookup:
from durable_worker.routing import reply_target
# One reply target per broker id (e.g. a results queue / publisher per transport).
targets = {
"redis-primary": primary_results,
"redis-failover": failover_results,
}
def on_task(task):
result = worker.process_task(task, is_cancelled=registry.is_cancelled)
target = reply_target(task, targets) # picks the broker the task came in on
target.send(result) # reply where you were told, never pick one yourselfreply_target(task, targets) returns the entry keyed by task["transport"]. It falls back to the
lone target when the id is absent (a single transport) or unknown, and returns None only when
targets is empty — so the single-broker case keeps working with a one-entry mapping and no special
casing.
That said, the simpler pattern is one runner per broker: start a run_redis_worker (or SQS / DB
runner) for each broker, each consuming and replying on its own transport. Each runner already publishes
its results on the broker it consumes, so there's nothing to route — no reply_target needed. Reach for
reply_target only when a single worker loop genuinely multiplexes several brokers and has to decide,
per task, where the reply goes.
Telescope
Surface workflow runs and steps inside nestjs-telescope, alongside your app's requests, queries and jobs.
Linting for non-determinism
Catch Date.now(), Math.random(), new Date() and crypto.randomUUID() inside a @Workflow run at author time with @dudousxd/nestjs-durable-eslint-plugin — shipped for both ESLint (flat config) and Biome (GritQL plugin).