Aviary
Recipes

Custom watcher

Capture a source Telescope doesn't ship a watcher for — a tiny WebSocket-events watcher built on ctx.record(), and the real instrument(emit, ctx) escape hatch for a bespoke cache, both correlated to the request that caused them.

A watcher is a class that implements Watcher and wires a framework hook in register(). When it records inside the caller's async context, the entry lands in the right batch automatically — the request (or job) it caused. This recipe shows the bare ctx.record() path and the real instrument(emit, ctx) escape hatch a shipped watcher exposes.

See Capture & correlation for how batches and async context turn scattered events into one navigable flow.


The contract

From watcher.ts:

interface Watcher {
  readonly type: string;                     // the entry `type` this watcher produces
  register(ctx: WatcherContext): void | Promise<void>; // wire hooks, once at module init
  shouldRecord?(candidate: unknown): boolean; // optional cheap pre-filter
}

interface WatcherContext {
  record(input: RecordInput): void;          // fire-and-forget, never throws/blocks
  runInBatch<T>(origin: BatchOrigin, fn: () => Promise<T>): Promise<T>;
  beginBatch(origin: BatchOrigin): BatchHandle;
  readonly config: ResolvedCoreConfig;
  readonly moduleRef: ModuleRef;             // resolve providers from the Nest container
}

A RecordInput is { type, content, familyHash?, tags?, durationMs?, startedAt? }. Everything else (id, batchId, sequence, traceId, ...) is enriched by the recorder.


Example A — a WebSocket-events watcher

Captures every message a Socket.IO gateway emits, correlated to the request/job active when it fired:

import {
  type RecordInput,
  type Watcher,
  type WatcherContext,
} from '@dudousxd/nestjs-telescope';
import type { Server } from 'socket.io';

interface WsEventContent {
  event: string;
  room: string | null;
  payloadBytes: number;
}

export class WebSocketWatcher implements Watcher {
  readonly type = 'websocket';

  constructor(private readonly io: Server) {}

  register(ctx: WatcherContext): void {
    // Wrap the server's emit so each outbound event is recorded in the
    // caller's async context — no batch is opened here.
    const original = this.io.emit.bind(this.io);
    this.io.emit = (event: string, ...args: unknown[]): boolean => {
      this.safeRecord(ctx, {
        event,
        room: null,
        payloadBytes: Buffer.byteLength(JSON.stringify(args ?? [])),
      });
      return original(event, ...args);
    };
  }

  // Never let a recording failure change the host's behavior.
  private safeRecord(ctx: WatcherContext, content: WsEventContent): void {
    try {
      const input: RecordInput = {
        type: this.type,
        familyHash: `ws:${content.event}`, // groups identical events together
        content,
        tags: [`ws:${content.event}`],
      };
      ctx.record(input);
    } catch (error) {
      const message = error instanceof Error ? error.message : String(error);
      console.error(`WebSocketWatcher: failed to record: ${message}`);
    }
  }
}
TelescopeModule.forRoot({
  watchers: [new WebSocketWatcher(myIoServer)],
});

How it works

  • register() runs once at module init. Wire your hook (here, wrapping emit) and return.
  • ctx.record(input) is fire-and-forget — it never throws and never blocks. You don't await it.
  • Correlation is automatic. Because record() is called synchronously inside the caller's async context (the active request/job ALS scope), the entry is stamped with that batch's id. You open no batch — only entry-point watchers (HTTP, queues) do that with runInBatch / beginBatch.
  • familyHash groups like-with-like in the UI: every chat:message event shares a family, so the dashboard can show "this event, 4,000 times".

Never break the host

A watcher instruments someone else's code path. Wrap every record in a try/catch (the safeRecord pattern) so a Telescope bug can never alter — or crash — the operation you're observing. The shipped watchers all do this; ctx.record itself already swallows recorder-side failures, but your content-building code (the JSON.stringify above) runs on the host's thread, so guard it too.


Example B — the real escape hatch: instrument a custom cache

Some shipped watchers expose an instrument(emit, ctx) hook so a host can wire a cache's native events instead of monkey-patching get/set. This is the exact, shipped CustomCacheSource API from cache.watcher.ts — use it for a cache that isn't a cache-manager-style Cache (e.g. BentoCache).

import { CacheWatcher, type CacheEventInput } from '@dudousxd/nestjs-telescope-cache';
import type { WatcherContext } from '@dudousxd/nestjs-telescope';
import { BentoCache } from 'bentocache';

new CacheWatcher({
  // Called exactly once at register(), with:
  //  - emit: records a cache entry in the active request/job batch
  //  - ctx:  the WatcherContext (resolve your cache, subscribe to its events)
  instrument: (emit: (event: CacheEventInput) => void, ctx: WatcherContext) => {
    const cache = ctx.moduleRef.get(BentoCache, { strict: false });

    cache.on('cache:hit', ({ key }) =>
      emit({ operation: 'get', key, hit: true }),
    );
    cache.on('cache:miss', ({ key }) =>
      emit({ operation: 'get', key, hit: false }),
    );
    cache.on('cache:written', ({ key }) =>
      emit({ operation: 'set', key, hit: null }),
    );
  },
});
TelescopeModule.forRoot({
  watchers: [
    new CacheWatcher({
      instrument: (emit, ctx) => {
        /* ...as above... */
      },
    }),
  ],
});

How it works

  • instrument runs once at register(). Resolve your cache from ctx.moduleRef.get(...) (use { strict: false } so it finds providers in any module) and subscribe to its native events.
  • emit(event) is the correlated record. It fires inside the cache's own callback — which runs in the caller's async context — so each CacheEventInput lands in the request/job batch that issued the cache op, with the same family-hash, tags, and error-swallowing as the auto-patch path. The host owns the subscription; the watcher patches nothing.
  • hit is true/false for reads, null (or omitted) for writes.

The takeaway pattern: when you ship (or fork) a watcher, expose an instrument(emit, ctx) constructor option so a community user can wire any backend's events through emit — Telescope never has to special-case every library.

On this page