Aviary
Authoring

Queries & updates

Reading a live run's state with ctx.setEvent + engine.getEvent (side-effect-free queries), and steering it with ctx.onUpdate + engine.registerUpdateValidator + engine.update (validated, Temporal-style updates that can be rejected before they touch the run).

A run is durable but it is not opaque. While it is in flight — suspended on a sleep, waiting on a remote step, parked on a signal — you often need to read what it has done so far, and sometimes to steer it from the outside. The engine gives you two distinct primitives for that, mirroring Temporal's split between queries (read-only) and updates (mutating, validated):

  • Queries publish a named value from inside the run (ctx.setEvent) and read the latest one from outside (engine.getEvent) with no side effect on the run.
  • Updates expose a named update point inside the run (ctx.onUpdate) that suspends until an external engine.update delivers an argument — gated by an optional validator that can reject the update before it touches the run.

Queries

A query is a side-effect-free read of a run's state. The run publishes values; readers observe them. Neither side resumes, suspends, or otherwise perturbs the run.

Publishing a value — ctx.setEvent(key, value)

Inside the workflow body, call ctx.setEvent(key, value) to publish a named, queryable value. The latest value for a given key is what a reader sees; calling setEvent again with the same key overwrites it. It is checkpointed and replay-safe — on replay the publish is idempotent, so it never double-fires — and it consumes one logical position like any other step.

@Workflow({ name: 'video-encode', version: '1' })
export class VideoEncodeWorkflow {
  constructor(private readonly encoder: EncoderService) {}

  async run(ctx: WorkflowCtx, job: EncodeJob) {
    await ctx.setEvent('progress', { phase: 'probing', pct: 0 });

    const segments = await ctx.step('probe', () => this.encoder.probe(job.src));

    for (let i = 0; i < segments.length; i++) {
      await ctx.remote(encodeSegment, { jobId: job.id, segment: segments[i] });
      // Publish progress after each segment — readers see the latest snapshot.
      await ctx.setEvent('progress', {
        phase: 'encoding',
        pct: Math.round(((i + 1) / segments.length) * 100),
        done: i + 1,
        total: segments.length,
      });
    }

    await ctx.setEvent('progress', { phase: 'done', pct: 100 });
    return { jobId: job.id, segments: segments.length };
  }
}

Use setEvent for anything an outside observer might want before the run completes: progress, a partial result, a human-readable status string, an intermediate id (e.g. a provider's job handle). Because each call overwrites the previous value for that key, you can publish a key as often as you like without growing unbounded state — only the latest survives a query.

Reading a value — engine.getEvent(runId, key)

From outside the run — a controller, a polling endpoint, another service — read the latest published value with engine.getEvent(runId, key). It returns the most recent value for that key, or undefined if the run never published it. Crucially, the read has no effect on the run: it does not resume a suspended run, does not consume a logical position, and does not appear in the run's history. This is the suspend-model counterpart of a Temporal query.

@Controller('jobs')
export class JobsController {
  constructor(private readonly engine: WorkflowEngine) {}

  @Get(':runId/progress')
  async progress(@Param('runId') runId: string) {
    const progress = await this.engine.getEvent<{ phase: string; pct: number }>(runId, 'progress');
    if (!progress) return { phase: 'pending', pct: 0 };
    return progress;
  }
}

getEvent works against a live run and a finished one — the published values live in the run's checkpoints, so they remain queryable after the run completes (useful for surfacing a final summary without unpacking the full output). The read is typed: pass the value type as the generic (getEvent<TValue>), and the engine returns TValue | undefined.

The dashboard exposes this read over HTTP as GET runs/:id/events/:key, so the UI (or any HTTP client) can poll a run's published progress without touching the engine API directly.

Updates

A query reads; an update steers. An update point inside the run suspends until an external caller delivers an argument — and, unlike a raw signal, an update can be validated and rejected before it ever touches the run. That validator is the differentiator: it runs in the caller's request, so a rejected update returns a reason synchronously and leaves the run exactly as it was.

The update point — ctx.onUpdate(name, { timeoutMs? })

Inside the workflow, await ctx.onUpdate(name) suspends the run with zero compute until an engine.update(runId, name, arg) delivers arg, then resumes with it. The name is run-scoped — it only needs to be unique within this run, not globally — so an approval flow can simply name its update point 'decision'. Pass { timeoutMs } to bound the wait; if the deadline passes before an update arrives, the call throws SignalTimeoutError, which you can catch in the workflow to take a default branch.

@Workflow({ name: 'expense-approval', version: '1' })
export class ExpenseApprovalWorkflow {
  constructor(private readonly ledger: LedgerService) {}

  async run(ctx: WorkflowCtx, expense: Expense) {
    await ctx.setEvent('status', { state: 'awaiting-approval', amountCents: expense.amountCents });

    let decision: { approved: boolean; approver: string; note?: string };
    try {
      // Suspends here — no compute consumed — until engine.update delivers a decision, or 7 days pass.
      decision = await ctx.onUpdate('decision', { timeoutMs: 7 * 24 * 60 * 60 * 1000 });
    } catch (err) {
      if (err instanceof SignalTimeoutError) {
        await ctx.setEvent('status', { state: 'expired' });
        throw new FatalError('approval timed out', 'expired');
      }
      throw err;
    }

    if (!decision.approved) {
      await ctx.setEvent('status', { state: 'rejected', by: decision.approver });
      return { reimbursed: false };
    }

    await ctx.step('reimburse', () => this.ledger.reimburse(expense, decision.approver));
    await ctx.setEvent('status', { state: 'reimbursed', by: decision.approver });
    return { reimbursed: true };
  }
}

The validator — engine.registerUpdateValidator(workflow, name, validate)

Register a validator for an (workflow, update name) pair. It runs before the update is delivered, in the calling request — so it can enforce a business rule and reject the update without ever disturbing the run. To reject, either throw or return a non-empty reason string; to accept, return nothing (void). The validator may be async, so it can check a database, call out to an authz service, or re-validate the argument shape.

@Injectable()
export class ExpenseApprovalValidators implements OnModuleInit {
  constructor(
    private readonly engine: WorkflowEngine,
    private readonly people: PeopleService,
  ) {}

  onModuleInit() {
    this.engine.registerUpdateValidator<{ approved: boolean; approver: string; note?: string }>(
      'expense-approval',
      'decision',
      async (arg) => {
        const approver = await this.people.find(arg.approver);
        if (!approver) return `unknown approver "${arg.approver}"`;
        if (!approver.canApproveExpenses) return `${arg.approver} is not allowed to approve expenses`;
        if (!arg.approved && !arg.note) return 'a rejection must include a note';
        // returning nothing accepts the update
      },
    );
  }
}

Only one validator lives per (workflow, name); registering again replaces it. If no validator is registered, every update is accepted and delivered as-is.

Delivering an update — engine.update(runId, name, arg)

From outside, call engine.update(runId, name, arg). It runs the validator first, then (if accepted) delivers arg to the suspended ctx.onUpdate and resumes the run. The return type makes the outcome explicit:

type UpdateResult =
  | { accepted: false; reason: string }
  | { accepted: true; run: RunResult | null };
  • { accepted: false, reason } — the validator rejected it (or the run wasn't found). The run is untouched; reason is the validator's thrown message or returned string.
  • { accepted: true, run } — accepted and delivered. run is the resumed run's result, or null when nothing was waiting at that update point yet (a too-early or duplicate update).
@Controller('expenses')
export class ExpensesController {
  constructor(private readonly engine: WorkflowEngine) {}

  @Post(':runId/decision')
  async decide(@Param('runId') runId: string, @Body() body: DecisionDto) {
    const result = await this.engine.update(runId, 'decision', body);
    if (!result.accepted) {
      throw new BadRequestException(result.reason);
    }
    return { ok: true, status: result.run?.status ?? 'pending' };
  }
}

The dashboard exposes this over HTTP as POST runs/:id/updates/:name, with the request body becoming arg — so the same validated path is available to any HTTP client, not just code holding the engine.

Why a validator beats a raw signal

You can already steer a run with ctx.waitForSignal + engine.signal. The difference is where the rejection happens. A signal always lands; if the argument is bad, the workflow body has to detect it after resuming and somehow get the run back to a waiting state. An update with a validator rejects in the caller's request, synchronously, with a reason — the run never wakes for a bad update, and the caller gets immediate, actionable feedback. That is the Temporal-style "update with validator" guarantee, here in the suspend model.

On this page