Aviary
Concepts

Workflows & steps

Declaring workflows with @Workflow, local steps with ctx.step, typed remote steps with remoteStep + ctx.remote and @DurableStep handlers, retries, fan-out and fatal errors.

Workflows

A workflow is a provider decorated with @Workflow; its run(ctx, input) method is the deterministic body the engine executes and replays.

@Workflow({ name: 'checkout', version: '1' })
export class CheckoutWorkflow {
  constructor(private readonly inventory: InventoryService) {}
  async run(ctx: WorkflowCtx, order: Order) {
    /* ... */
  }
}

Register it as a normal provider — DurableModule discovers it. Start runs with WorkflowService.start(CheckoutWorkflow, input) — pass the workflow class for a typed input, or a name string for a cross-runtime workflow.

Local steps

ctx.step(name, fn, opts?) runs a unit of work locally and checkpoints its result.

const quote = await ctx.step('quote', () => this.pricing.fetch(order), { retries: 3 });

In-flight visibility

When a local step's body begins, the engine emits a step.started lifecycle event and (by default) writes a running checkpoint, so a long step shows up in the dashboard the moment it starts — not only once it finishes. The running checkpoint is a placeholder overwritten by the step's completed/failed result; it never short-circuits replay (only a completed checkpoint does), so a crash mid-body simply re-runs the step.

Toggle this with the engine's trackStepStart option (default true). The step.started event fires either way — the live event stream always sees the start; the flag only gates the extra checkpoint write. Set it to false on hot paths with many short local steps to halve their checkpoint writes, at the cost of reload-survivable in-flight visibility.

Remote steps

Declare a typed remote step with remoteStep, then call it with ctx.remote — the engine dispatches it over the transport and checkpoints the result. The handler can live in another process or another language.

export const chargeCard = remoteStep({
  name: 'payments.charge-card',
  input: z.object({ orderId: z.string(), amountCents: z.number().int() }),
  output: z.object({ chargeId: z.string() }),
  retries: 3,
});

// in the workflow:
const charge = await ctx.remote(chargeCard, { orderId: order.id, amountCents: order.total });

A @DurableStep provider method handles it in-process (with an in-process transport):

@Injectable()
export class PaymentsWorker {
  @DurableStep('payments.charge-card')
  async charge(input: { orderId: string; amountCents: number }) {
    return { chargeId: await this.stripe.charge(input) };
  }
}

Fan-out (parallel steps)

Run steps concurrently with Promise.all — checkpoints stay deterministic because each step's position is taken in the synchronous prefix before any await:

const [a, b] = await Promise.all([
  ctx.step('a', () => doA()),
  ctx.step('b', () => doB()),
]);

Fatal errors

Any thrown error is retried up to the step's limit. To stop retrying a business failure that a retry can't fix, throw FatalError — it fails the run immediately:

import { FatalError } from '@dudousxd/nestjs-durable-core';

await ctx.step('charge', async () => {
  const res = await stripe.charge(order);
  if (res.declined) throw new FatalError('card declined', 'declined');
  return res;
});

Steps vs. sub-process events

These look similar in the dashboard but are fundamentally different — the distinction is durability:

  • A step (ctx.step / ctx.remote) is a durable checkpoint. The engine records its result, so on a crash, retry, or replay it is not re-executed — the saved result is replayed. It's a first-class node in the run graph and the unit of recovery. Use a step for any unit that should survive a crash or not be redone.

  • A sub-process event is a log annotation emitted inside a step via the step logger's log.sub(name, status) — e.g. one entry per item in a fan-out the step performs internally. It is not durable on its own: it's metadata attached to the parent step's checkpoint. If the parent step retries, all of its sub-process events are produced again. Use it for visibility, not recovery.

// One durable step that internally processes N items and records each outcome for visibility.
await ctx.step('process-batch', async (log) => {
  for (const item of batch) {
    try {
      await handle(item);
      log.sub(item.id, 'ok'); // a sub-process event — shown under the step, not a checkpoint
    } catch (e) {
      log.sub(item.id, 'failed', String(e));
    }
  }
});

Rule of thumb: if you want each unit to retry or replay independently, make it a step (or a child workflow when it has its own internal steps you want to see). If you only want to see what happened inside one durable unit, emit sub-process events.

Tags

Label runs with tags to find them later. Static tags on the @Workflow apply to every run; per-run tags are added at start. Both are merged onto the run and are searchable in the dashboard.

@Workflow({ name: 'pipeline', version: '1', tags: ['etl', 'critical'] })
export class PipelineWorkflow {
  /* ... */
}

// per-run tags merge with the static ones → run.tags = ['etl', 'critical', 'nightly']
await this.workflows.start(PipelineWorkflow, input, runId, { tags: ['nightly'] });

The dashboard shows a run's tags in the list and detail, and a tag filter box (clicking a tag filters the list). Programmatically, query by tag with RunQuery.tag or the dashboard API's ?tag= param.

On this page