SQL (database)
A broker-less, DBOS-style transport — remote steps are rows in the database you already run. Workers claim tasks with SELECT … FOR UPDATE SKIP LOCKED. The table + claim contract is documented so Node and Python workers share it.
The SQL transport needs no broker. A remote step is a row in the same database your durable
store already uses: ctx.remote inserts a task row, a worker claims it with
SELECT … FOR UPDATE SKIP LOCKED, runs the handler, writes a result row, and the engine polls that
row to resume the run. The database you already operate is the queue.
pnpm add @dudousxd/nestjs-durable-transport-dbnew DbTransport({ executor: mikroOrmExecutor(em) }); // rides your app's connectionconst transport = new DbTransport({ executor: typeOrmExecutor(ds), group: 'pipeline.extract' });
transport.handle('pipeline.extract', async (input) => extract(input));group is the queue token this instance polls — set it to the exact (optionally
@<partition>-suffixed) step name it serves; each DbTransport instance polls one queue.
Trade-off vs a real broker: throughput is bounded by polling + row contention — great for
workflow/pipeline scale (modest rate, long steps), not for high-fanout firehoses. Requires
FOR UPDATE SKIP LOCKED: Postgres 9.5+ or MySQL 8+ (not SQLite).
The contract
This is the single source of truth for the table shape and the claim protocol. The TypeScript
DbTransport and the Python db_runner both implement against this
spec — so a Node engine and a Python worker share the same two tables without drift. Table names are
<prefix>_transport_tasks and <prefix>_transport_results (default prefix durable).
Tables
CREATE TABLE durable_transport_tasks (
step_id varchar(191) PRIMARY KEY, -- "<runId>:<seq>", the step identity
run_id varchar(191) NOT NULL,
seq integer NOT NULL,
name varchar(191) NOT NULL, -- step name → handler key
grp varchar(191) NOT NULL, -- worker group → which workers poll it
input text, -- JSON (longtext on MySQL)
attempt integer NOT NULL,
status varchar(32) NOT NULL, -- 'pending'
claimed_by varchar(191), -- lease owner (debug)
claimed_at bigint, -- lease stamp (epoch ms)
created_at bigint NOT NULL
);
CREATE TABLE durable_transport_results (
step_id varchar(191) PRIMARY KEY,
run_id varchar(191) NOT NULL,
seq integer NOT NULL,
status varchar(32) NOT NULL, -- 'completed' | 'failed'
output text, -- JSON
error text, -- JSON
started_at bigint, -- worker pickup (epoch ms) → queue-wait
claimed_by varchar(191),
claimed_at bigint,
created_at bigint NOT NULL
);started_at is what makes queue-wait work over this transport: the
worker stamps when it picked the task up, the engine subtracts its dispatch time.
Claim protocol
A worker claims a batch atomically, runs each task outside the lock, then records the result:
- Claim — in one transaction, lease up to
batchSizeun-leased rows for the group:SELECT * FROM durable_transport_tasks WHERE grp = ? AND (claimed_at IS NULL OR claimed_at < ?) -- ? = now − leaseMs ORDER BY created_at ASC LIMIT ? FOR UPDATE SKIP LOCKED; UPDATE durable_transport_tasks SET claimed_by = ?, claimed_at = ? WHERE step_id IN (…);SKIP LOCKEDis the heart of it: concurrent workers step over each other's locked rows instead of blocking, so a row is never claimed twice. Theclaimed_at < now − leaseMsclause reclaims a row whose worker crashed mid-flight (crash recovery, in place of heartbeats). - Run — execute the handler for
namewith the parsedinput. - Record — insert the result row (idempotent:
ON CONFLICT DO NOTHING/INSERT IGNORE) andDELETEthe task row.
The engine side mirrors it on the results table: claim → deliver to the suspended ctx.remote →
DELETE. Delivery is at-least-once (a crash between insert-result and delete-task can re-run a
handler), so handlers should be idempotent, keyed on step_id.
Dialect notes
Only two things differ between Postgres and MySQL, and both libraries handle them the same way:
| Postgres | MySQL | |
|---|---|---|
| Placeholders | $1, $2, … | ? / %s |
| Identifier quoting | "col" | `col` |
| Idempotent insert | ON CONFLICT (step_id) DO NOTHING | INSERT IGNORE |
| JSON column | text | longtext |
AWS SQS
The queue-backed transport on AWS SQS. Same RemoteTask/StepResult contract as BullMQ — tasks go to a per-group queue, results return on a shared queue — so Node and Python workers interoperate.
Overview
Where durable state lives. A StateStore interface with MikroORM, TypeORM, Prisma and Drizzle adapters that run on Postgres, MySQL, SQLite or libSQL, an in-memory store for tests, and auto-schema on boot (opt-out, with a migration helper).