Aviary
Recipes

Custom storage adapter

Persist Telescope entries in a store no shipped adapter covers — implement the StorageProvider SPI end to end, with the keyset-pagination contract and the optional RollupStore add-on.

A storage adapter is a single class implementing StorageProvider. The API, dashboard, and pruner only ever touch your store through this interface — so once it compiles and honors the keyset-pagination contract, every screen works against it. This recipe builds a complete MongoDB-backed provider; the Mongo calls are clearly-marked sketches, but every Telescope-facing line is exact.

See Storage for the design rationale; this page is the implementation.


The contract

Read the full SPI from storage-provider.ts:

interface StorageProvider {
  store(entries: Entry[]): Promise<void>;
  update(id: string, patch: Partial<Entry>): Promise<void>;
  find(id: string): Promise<EntryWithBatch | null>;
  get(query: EntryQuery): Promise<Page<Entry>>;
  batch(batchId: string): Promise<Entry[]>;
  tags(prefix?: string): Promise<TagCount[]>;
  prune(olderThan: Date, keepLast?: number): Promise<number>;
  clear(): Promise<void>;
  init?(): void | Promise<void>;   // optional — acquire resources / ensure schema
  close?(): void | Promise<void>;  // optional — release OWNED resources
}

An Entry is { id, batchId, type, familyHash, content, tags, sequence, durationMs, origin, instanceId, traceId, spanId, createdAt } — persist all of it; the dashboard reads every field.


Full example — a Mongo-backed provider

import {
  type Entry,
  type EntryQuery,
  type EntryWithBatch,
  type Page,
  type StorageProvider,
  type TagCount,
  encodeCursor,
  decodeCursor,
} from '@dudousxd/nestjs-telescope';
import { type Collection, type Db, type Filter, MongoClient } from 'mongodb';

export class MongoStorageProvider implements StorageProvider {
  private client: MongoClient;
  private entries!: Collection<Entry>;

  constructor(private readonly url: string, private readonly dbName = 'telescope') {
    this.client = new MongoClient(this.url);
  }

  // --- lifecycle ----------------------------------------------------------

  async init(): Promise<void> {
    await this.client.connect();
    const db: Db = this.client.db(this.dbName);
    this.entries = db.collection<Entry>('telescope_entries');
    // Indexes that mirror what the queries below scan. Newest-first reads use
    // the (createdAt, id) compound; the batch view uses batchId.
    await this.entries.createIndex({ createdAt: -1, id: -1 });
    await this.entries.createIndex({ batchId: 1, sequence: 1 });
    await this.entries.createIndex({ id: 1 }, { unique: true });
  }

  // We OWN the MongoClient, so close it. A provider handed a host-owned
  // connection would make this a no-op instead.
  async close(): Promise<void> {
    await this.client.close();
  }

  // --- writes -------------------------------------------------------------

  async store(entries: Entry[]): Promise<void> {
    if (entries.length === 0) return;
    await this.entries.insertMany(entries, { ordered: false });
  }

  async update(id: string, patch: Partial<Entry>): Promise<void> {
    // `id` is immutable — strip it from the patch before applying.
    const { id: _ignored, ...fields } = patch;
    await this.entries.updateOne({ id }, { $set: fields });
  }

  // --- reads --------------------------------------------------------------

  async find(id: string): Promise<EntryWithBatch | null> {
    const entry = await this.entries.findOne({ id }, { projection: { _id: 0 } });
    if (entry === null) return null;
    const batch = await this.batch(entry.batchId);
    return { ...entry, batch };
  }

  async get(query: EntryQuery): Promise<Page<Entry>> {
    const filter = this.toFilter(query);
    const limit = query.limit ?? 50;

    // Keyset pagination: a cursor encodes a (createdAt, id) position; return
    // entries strictly OLDER than it. Resuming from a pruned position still
    // works because we compare positions, not row offsets.
    const cursor = query.cursor === undefined ? null : decodeCursor(query.cursor);
    if (cursor !== null) {
      const at = new Date(cursor.createdAt);
      filter.$or = [
        { createdAt: { $lt: at } },
        { createdAt: at, id: { $lt: cursor.id } },
      ];
    }

    const projection = query.omitContent === true
      ? { _id: 0, content: 0 } // skip the big column for aggregate scans
      : { _id: 0 };

    // Fetch limit + 1 to learn whether a next page exists.
    const rows = await this.entries
      .find(filter, { projection })
      .sort({ createdAt: -1, id: -1 })
      .limit(limit + 1)
      .toArray();

    const hasMore = rows.length > limit;
    const data = (hasMore ? rows.slice(0, limit) : rows).map((row) =>
      query.omitContent === true ? { ...row, content: null } : row,
    );

    const last = data.at(-1);
    const nextCursor =
      hasMore && last !== undefined
        ? encodeCursor(last.createdAt.getTime(), last.id)
        : null;

    return { data, nextCursor };
  }

  async batch(batchId: string): Promise<Entry[]> {
    return this.entries
      .find({ batchId }, { projection: { _id: 0 } })
      .sort({ sequence: 1 })
      .toArray();
  }

  async tags(prefix?: string): Promise<TagCount[]> {
    const match = prefix === undefined ? {} : { tags: { $regex: `^${prefix}` } };
    const rows = await this.entries
      .aggregate<TagCount>([
        { $match: match },
        { $unwind: '$tags' },
        ...(prefix === undefined
          ? []
          : [{ $match: { tags: { $regex: `^${prefix}` } } }]),
        { $group: { _id: '$tags', count: { $sum: 1 } } },
        { $project: { _id: 0, tag: '$_id', count: 1 } },
      ])
      .toArray();
    return rows;
  }

  // --- retention ----------------------------------------------------------

  async prune(olderThan: Date, keepLast?: number): Promise<number> {
    // keepLast: never drop the newest N entries even if older than the cutoff.
    if (keepLast !== undefined && keepLast > 0) {
      const survivors = await this.entries
        .find({}, { projection: { id: 1, _id: 0 } })
        .sort({ createdAt: -1, id: -1 })
        .limit(keepLast)
        .toArray();
      const keepIds = survivors.map((row) => row.id);
      const { deletedCount } = await this.entries.deleteMany({
        createdAt: { $lt: olderThan },
        id: { $nin: keepIds },
      });
      return deletedCount;
    }
    const { deletedCount } = await this.entries.deleteMany({
      createdAt: { $lt: olderThan },
    });
    return deletedCount;
  }

  async clear(): Promise<void> {
    await this.entries.deleteMany({});
  }

  // --- query translation --------------------------------------------------

  private toFilter(query: EntryQuery): Filter<Entry> {
    const filter: Filter<Entry> = {};
    if (query.type !== undefined) filter.type = query.type;
    if (query.tag !== undefined) filter.tags = query.tag;
    if (query.familyHash !== undefined) filter.familyHash = query.familyHash;
    if (query.batchId !== undefined) filter.batchId = query.batchId;
    if (query.traceId !== undefined) filter.traceId = query.traceId;
    if (query.ids !== undefined) filter.id = { $in: query.ids };
    if (query.before !== undefined || query.after !== undefined) {
      filter.createdAt = {
        ...(query.before !== undefined ? { $lt: query.before } : {}),
        ...(query.after !== undefined ? { $gte: query.after } : {}),
      };
    }
    if (query.search !== undefined) {
      // Case-insensitive substring over serialized content (a request matches
      // by uri, a query by sql, an exception by message, ...).
      filter['content'] = { $regex: query.search, $options: 'i' };
    }
    return filter;
  }
}

Wire it like any adapter:

TelescopeModule.forRoot({
  storage: new MongoStorageProvider(process.env.MONGO_URL),
});

init() is awaited at boot before any other method; close() runs once at shutdown after the final flush. You call neither.


How it works — the load-bearing lines

  • get() is the whole adapter. Every list screen, type tab, and free-text search funnels through it. Sort createdAt DESC, id DESC and honor the cursor exactly as shown.
  • encodeCursor / decodeCursor are exported from core — reuse them so your cursor is the same opaque (createdAtMs, id) token every other adapter produces. Don't invent your own format.
  • Fetch limit + 1. That extra row is how you decide whether to emit a nextCursor. Emit null when there's no next page so the UI stops paging.
  • omitContent lets aggregate scans (pulse, timeseries) skip the large content column. Project it out where your driver can; whatever happens, set content: null on the returned rows so callers can never accidentally depend on it.
  • ids is for batched hydration — one query for N ids instead of N find() round-trips. An empty array must return no entries ($in: [] does this for free).
  • update() must treat id as immutable — strip it from the patch (the request entry is patched in place when it completes).

The keyset-pagination contract

get() returns entries strictly older than the cursor position, sorted newest-first — never an offset/skip. The contract is explicit: if the cursor's original entry was pruned out from under the cursor, pagination must resume from that position, not return an empty page. Comparing the (createdAt, id) pair (as the $or above does) satisfies this for free; a row-offset skip does not, and would silently drop or repeat entries as the pruner runs. An undecodable cursor is silently ignored and paging starts from the first page — decodeCursor returns null, which the example treats as "no cursor".


Advanced — pre-aggregated rollups

get() alone is enough; pulse and timeseries fall back to a raw scan. If your store can aggregate cheaply (most can), additionally implement the optional RollupStore SPI from rollup-store.ts. It's detected at runtime via duck-typing (isRollupStore) — it is deliberately not part of StorageProvider, so adding it is purely additive and a store that omits it keeps working.

import {
  type RollupBucket,
  type RollupDelta,
  type RollupStore,
  HISTOGRAM_BUCKET_COUNT,
} from '@dudousxd/nestjs-telescope';

export class MongoStorageProvider implements StorageProvider, RollupStore {
  // ...everything above, plus:

  private rollups!: Collection<RollupBucket>;
  // (create it in init(): db.collection('telescope_rollups'),
  //  unique index on { metric: 1, bucketStart: 1 })

  // Additively merge deltas: accumulate count/sum, keep a running max, and
  // element-wise add the fixed-length latency histogram. Idempotent per
  // (metric, bucketStart) via upsert.
  async recordRollups(deltas: RollupDelta[]): Promise<void> {
    for (const delta of deltas) {
      const histInc: Record<string, number> = {};
      for (let i = 0; i < HISTOGRAM_BUCKET_COUNT; i += 1) {
        histInc[`histogram.${i}`] = delta.histogram[i] ?? 0;
      }
      await this.rollups.updateOne(
        { metric: delta.metric, bucketStart: delta.bucketStart },
        {
          $inc: { count: delta.count, sum: delta.sum, ...histInc },
          $max: { max: delta.max },
        },
        { upsert: true },
      );
    }
  }

  async queryRollups(
    metrics: string[],
    fromBucket: number,
    toBucket: number,
  ): Promise<RollupBucket[]> {
    return this.rollups
      .find({
        metric: { $in: metrics },
        bucketStart: { $gte: fromBucket, $lte: toBucket },
      })
      .project<RollupBucket>({ _id: 0 })
      .toArray();
  }
}

Rollup buckets are 1-minute granularity (ROLLUP_BUCKET_MS) and each carries a fixed-length latency histogram (HISTOGRAM_BUCKET_COUNT cells). recordRollups must be additive and idempotent — the recorder may replay the same (metric, bucketStart) delta, so use $inc/$max upserts, never a blind overwrite.

On this page