Aviary
Transports

SQL (database)

A broker-less, DBOS-style transport — remote steps are rows in the database you already run. Workers claim tasks with SELECT … FOR UPDATE SKIP LOCKED. The table + claim contract is documented so Node and Python workers share it.

The SQL transport needs no broker. A remote step is a row in the same database your durable store already uses: ctx.remote inserts a task row, a worker claims it with SELECT … FOR UPDATE SKIP LOCKED, runs the handler, writes a result row, and the engine polls that row to resume the run. The database you already operate is the queue.

pnpm add @dudousxd/nestjs-durable-transport-db
engine side
new DbTransport({ executor: mikroOrmExecutor(em) });           // rides your app's connection
worker side
const transport = new DbTransport({ executor: typeOrmExecutor(ds), group: 'pipeline.extract' });
transport.handle('pipeline.extract', async (input) => extract(input));

group is the queue token this instance polls — set it to the exact (optionally @<partition>-suffixed) step name it serves; each DbTransport instance polls one queue.

Trade-off vs a real broker: throughput is bounded by polling + row contention — great for workflow/pipeline scale (modest rate, long steps), not for high-fanout firehoses. Requires FOR UPDATE SKIP LOCKED: Postgres 9.5+ or MySQL 8+ (not SQLite).

The contract

This is the single source of truth for the table shape and the claim protocol. The TypeScript DbTransport and the Python db_runner both implement against this spec — so a Node engine and a Python worker share the same two tables without drift. Table names are <prefix>_transport_tasks and <prefix>_transport_results (default prefix durable).

Tables

CREATE TABLE durable_transport_tasks (
  step_id    varchar(191) PRIMARY KEY,   -- "<runId>:<seq>", the step identity
  run_id     varchar(191) NOT NULL,
  seq        integer      NOT NULL,
  name       varchar(191) NOT NULL,      -- step name → handler key
  grp        varchar(191) NOT NULL,      -- worker group → which workers poll it
  input      text,                       -- JSON (longtext on MySQL)
  attempt    integer      NOT NULL,
  status     varchar(32)  NOT NULL,      -- 'pending'
  claimed_by varchar(191),               -- lease owner (debug)
  claimed_at bigint,                     -- lease stamp (epoch ms)
  created_at bigint        NOT NULL
);

CREATE TABLE durable_transport_results (
  step_id    varchar(191) PRIMARY KEY,
  run_id     varchar(191) NOT NULL,
  seq        integer      NOT NULL,
  status     varchar(32)  NOT NULL,      -- 'completed' | 'failed'
  output     text,                       -- JSON
  error      text,                       -- JSON
  started_at bigint,                     -- worker pickup (epoch ms) → queue-wait
  claimed_by varchar(191),
  claimed_at bigint,
  created_at bigint        NOT NULL
);

started_at is what makes queue-wait work over this transport: the worker stamps when it picked the task up, the engine subtracts its dispatch time.

Claim protocol

A worker claims a batch atomically, runs each task outside the lock, then records the result:

  1. Claim — in one transaction, lease up to batchSize un-leased rows for the group:
    SELECT * FROM durable_transport_tasks
     WHERE grp = ? AND (claimed_at IS NULL OR claimed_at < ?)   -- ? = now − leaseMs
     ORDER BY created_at ASC LIMIT ? FOR UPDATE SKIP LOCKED;
    UPDATE durable_transport_tasks SET claimed_by = ?, claimed_at = ?
     WHERE step_id IN (…);
    SKIP LOCKED is the heart of it: concurrent workers step over each other's locked rows instead of blocking, so a row is never claimed twice. The claimed_at < now − leaseMs clause reclaims a row whose worker crashed mid-flight (crash recovery, in place of heartbeats).
  2. Run — execute the handler for name with the parsed input.
  3. Record — insert the result row (idempotent: ON CONFLICT DO NOTHING / INSERT IGNORE) and DELETE the task row.

The engine side mirrors it on the results table: claim → deliver to the suspended ctx.remoteDELETE. Delivery is at-least-once (a crash between insert-result and delete-task can re-run a handler), so handlers should be idempotent, keyed on step_id.

Dialect notes

Only two things differ between Postgres and MySQL, and both libraries handle them the same way:

PostgresMySQL
Placeholders$1, $2, …? / %s
Identifier quoting"col"`col`
Idempotent insertON CONFLICT (step_id) DO NOTHINGINSERT IGNORE
JSON columntextlongtext

On this page