Queries & updates
Reading a live run's state with ctx.setEvent + engine.getEvent (side-effect-free queries), and steering it with ctx.onUpdate + engine.registerUpdateValidator + engine.update (validated, Temporal-style updates that can be rejected before they touch the run).
A run is durable but it is not opaque. While it is in flight — suspended on a sleep, waiting on a remote step, parked on a signal — you often need to read what it has done so far, and sometimes to steer it from the outside. The engine gives you two distinct primitives for that, mirroring Temporal's split between queries (read-only) and updates (mutating, validated):
- Queries publish a named value from inside the run (
ctx.setEvent) and read the latest one from outside (engine.getEvent) with no side effect on the run. - Updates expose a named update point inside the run (
ctx.onUpdate) that suspends until an externalengine.updatedelivers an argument — gated by an optional validator that can reject the update before it touches the run.
Queries
A query is a side-effect-free read of a run's state. The run publishes values; readers observe them. Neither side resumes, suspends, or otherwise perturbs the run.
Publishing a value — ctx.setEvent(key, value)
Inside the workflow body, call ctx.setEvent(key, value) to publish a named, queryable value. The
latest value for a given key is what a reader sees; calling setEvent again with the same key
overwrites it. It is checkpointed and replay-safe — on replay the publish is idempotent, so it never
double-fires — and it consumes one logical position like any other step.
@Workflow({ name: 'video-encode', version: '1' })
export class VideoEncodeWorkflow {
constructor(private readonly encoder: EncoderService) {}
async run(ctx: WorkflowCtx, job: EncodeJob) {
await ctx.setEvent('progress', { phase: 'probing', pct: 0 });
const segments = await ctx.step('probe', () => this.encoder.probe(job.src));
for (let i = 0; i < segments.length; i++) {
await ctx.remote(encodeSegment, { jobId: job.id, segment: segments[i] });
// Publish progress after each segment — readers see the latest snapshot.
await ctx.setEvent('progress', {
phase: 'encoding',
pct: Math.round(((i + 1) / segments.length) * 100),
done: i + 1,
total: segments.length,
});
}
await ctx.setEvent('progress', { phase: 'done', pct: 100 });
return { jobId: job.id, segments: segments.length };
}
}Use setEvent for anything an outside observer might want before the run completes: progress, a
partial result, a human-readable status string, an intermediate id (e.g. a provider's job handle).
Because each call overwrites the previous value for that key, you can publish a key as often as you
like without growing unbounded state — only the latest survives a query.
Reading a value — engine.getEvent(runId, key)
From outside the run — a controller, a polling endpoint, another service — read the latest published
value with engine.getEvent(runId, key). It returns the most recent value for that key, or
undefined if the run never published it. Crucially, the read has no effect on the run: it does
not resume a suspended run, does not consume a logical position, and does not appear in the run's
history. This is the suspend-model counterpart of a Temporal query.
@Controller('jobs')
export class JobsController {
constructor(private readonly engine: WorkflowEngine) {}
@Get(':runId/progress')
async progress(@Param('runId') runId: string) {
const progress = await this.engine.getEvent<{ phase: string; pct: number }>(runId, 'progress');
if (!progress) return { phase: 'pending', pct: 0 };
return progress;
}
}getEvent works against a live run and a finished one — the published values live in the run's
checkpoints, so they remain queryable after the run completes (useful for surfacing a final summary
without unpacking the full output). The read is typed: pass the value type as the generic
(getEvent<TValue>), and the engine returns TValue | undefined.
The dashboard exposes this read over HTTP as GET runs/:id/events/:key, so the UI (or any HTTP
client) can poll a run's published progress without touching the engine API directly.
Updates
A query reads; an update steers. An update point inside the run suspends until an external caller delivers an argument — and, unlike a raw signal, an update can be validated and rejected before it ever touches the run. That validator is the differentiator: it runs in the caller's request, so a rejected update returns a reason synchronously and leaves the run exactly as it was.
The update point — ctx.onUpdate(name, { timeoutMs? })
Inside the workflow, await ctx.onUpdate(name) suspends the run with zero compute until an
engine.update(runId, name, arg) delivers arg, then resumes with it. The name is run-scoped
— it only needs to be unique within this run, not globally — so an approval flow can simply name its
update point 'decision'. Pass { timeoutMs } to bound the wait; if the deadline passes before an
update arrives, the call throws SignalTimeoutError, which you can catch in the workflow to take a
default branch.
@Workflow({ name: 'expense-approval', version: '1' })
export class ExpenseApprovalWorkflow {
constructor(private readonly ledger: LedgerService) {}
async run(ctx: WorkflowCtx, expense: Expense) {
await ctx.setEvent('status', { state: 'awaiting-approval', amountCents: expense.amountCents });
let decision: { approved: boolean; approver: string; note?: string };
try {
// Suspends here — no compute consumed — until engine.update delivers a decision, or 7 days pass.
decision = await ctx.onUpdate('decision', { timeoutMs: 7 * 24 * 60 * 60 * 1000 });
} catch (err) {
if (err instanceof SignalTimeoutError) {
await ctx.setEvent('status', { state: 'expired' });
throw new FatalError('approval timed out', 'expired');
}
throw err;
}
if (!decision.approved) {
await ctx.setEvent('status', { state: 'rejected', by: decision.approver });
return { reimbursed: false };
}
await ctx.step('reimburse', () => this.ledger.reimburse(expense, decision.approver));
await ctx.setEvent('status', { state: 'reimbursed', by: decision.approver });
return { reimbursed: true };
}
}The validator — engine.registerUpdateValidator(workflow, name, validate)
Register a validator for an (workflow, update name) pair. It runs before the update is
delivered, in the calling request — so it can enforce a business rule and reject the update
without ever disturbing the run. To reject, either throw or return a non-empty reason string; to
accept, return nothing (void). The validator may be async, so it can check a database, call out to
an authz service, or re-validate the argument shape.
@Injectable()
export class ExpenseApprovalValidators implements OnModuleInit {
constructor(
private readonly engine: WorkflowEngine,
private readonly people: PeopleService,
) {}
onModuleInit() {
this.engine.registerUpdateValidator<{ approved: boolean; approver: string; note?: string }>(
'expense-approval',
'decision',
async (arg) => {
const approver = await this.people.find(arg.approver);
if (!approver) return `unknown approver "${arg.approver}"`;
if (!approver.canApproveExpenses) return `${arg.approver} is not allowed to approve expenses`;
if (!arg.approved && !arg.note) return 'a rejection must include a note';
// returning nothing accepts the update
},
);
}
}Only one validator lives per (workflow, name); registering again replaces it. If no validator is
registered, every update is accepted and delivered as-is.
Delivering an update — engine.update(runId, name, arg)
From outside, call engine.update(runId, name, arg). It runs the validator first, then (if accepted)
delivers arg to the suspended ctx.onUpdate and resumes the run. The return type makes the outcome
explicit:
type UpdateResult =
| { accepted: false; reason: string }
| { accepted: true; run: RunResult | null };{ accepted: false, reason }— the validator rejected it (or the run wasn't found). The run is untouched;reasonis the validator's thrown message or returned string.{ accepted: true, run }— accepted and delivered.runis the resumed run's result, ornullwhen nothing was waiting at that update point yet (a too-early or duplicate update).
@Controller('expenses')
export class ExpensesController {
constructor(private readonly engine: WorkflowEngine) {}
@Post(':runId/decision')
async decide(@Param('runId') runId: string, @Body() body: DecisionDto) {
const result = await this.engine.update(runId, 'decision', body);
if (!result.accepted) {
throw new BadRequestException(result.reason);
}
return { ok: true, status: result.run?.status ?? 'pending' };
}
}The dashboard exposes this over HTTP as POST runs/:id/updates/:name, with the request body becoming
arg — so the same validated path is available to any HTTP client, not just code holding the engine.
Why a validator beats a raw signal
You can already steer a run with ctx.waitForSignal + engine.signal. The difference is where the
rejection happens. A signal always lands; if the argument is bad, the workflow body has to detect it
after resuming and somehow get the run back to a waiting state. An update with a validator rejects in
the caller's request, synchronously, with a reason — the run never wakes for a bad update, and the
caller gets immediate, actionable feedback. That is the Temporal-style "update with validator"
guarantee, here in the suspend model.
Child workflows
Compose workflows by calling other workflows — await a child's result with ctx.child, or kick one off fire-and-forget with ctx.startChild. Pass the workflow class for a typed input and result, or a name string for a cross-runtime child.
Durable webhooks
ctx.webhook() mints a durable callback handle with a deterministic token and a public url; hand the url to a third party inside a step, then await handle.wait() to suspend with zero compute until the callback arrives as engine.signal(token, body).