Custom storage adapter
Persist Telescope entries in a store no shipped adapter covers — implement the StorageProvider SPI end to end, with the keyset-pagination contract and the optional RollupStore add-on.
A storage adapter is a single class implementing StorageProvider. The API, dashboard, and pruner only ever touch your store through this interface — so once it compiles and honors the keyset-pagination contract, every screen works against it. This recipe builds a complete MongoDB-backed provider; the Mongo calls are clearly-marked sketches, but every Telescope-facing line is exact.
See Storage for the design rationale; this page is the implementation.
The contract
Read the full SPI from storage-provider.ts:
interface StorageProvider {
store(entries: Entry[]): Promise<void>;
update(id: string, patch: Partial<Entry>): Promise<void>;
find(id: string): Promise<EntryWithBatch | null>;
get(query: EntryQuery): Promise<Page<Entry>>;
batch(batchId: string): Promise<Entry[]>;
tags(prefix?: string): Promise<TagCount[]>;
prune(olderThan: Date, keepLast?: number): Promise<number>;
clear(): Promise<void>;
init?(): void | Promise<void>; // optional — acquire resources / ensure schema
close?(): void | Promise<void>; // optional — release OWNED resources
}An Entry is { id, batchId, type, familyHash, content, tags, sequence, durationMs, origin, instanceId, traceId, spanId, createdAt } — persist all of it; the dashboard reads every field.
Full example — a Mongo-backed provider
import {
type Entry,
type EntryQuery,
type EntryWithBatch,
type Page,
type StorageProvider,
type TagCount,
encodeCursor,
decodeCursor,
} from '@dudousxd/nestjs-telescope';
import { type Collection, type Db, type Filter, MongoClient } from 'mongodb';
export class MongoStorageProvider implements StorageProvider {
private client: MongoClient;
private entries!: Collection<Entry>;
constructor(private readonly url: string, private readonly dbName = 'telescope') {
this.client = new MongoClient(this.url);
}
// --- lifecycle ----------------------------------------------------------
async init(): Promise<void> {
await this.client.connect();
const db: Db = this.client.db(this.dbName);
this.entries = db.collection<Entry>('telescope_entries');
// Indexes that mirror what the queries below scan. Newest-first reads use
// the (createdAt, id) compound; the batch view uses batchId.
await this.entries.createIndex({ createdAt: -1, id: -1 });
await this.entries.createIndex({ batchId: 1, sequence: 1 });
await this.entries.createIndex({ id: 1 }, { unique: true });
}
// We OWN the MongoClient, so close it. A provider handed a host-owned
// connection would make this a no-op instead.
async close(): Promise<void> {
await this.client.close();
}
// --- writes -------------------------------------------------------------
async store(entries: Entry[]): Promise<void> {
if (entries.length === 0) return;
await this.entries.insertMany(entries, { ordered: false });
}
async update(id: string, patch: Partial<Entry>): Promise<void> {
// `id` is immutable — strip it from the patch before applying.
const { id: _ignored, ...fields } = patch;
await this.entries.updateOne({ id }, { $set: fields });
}
// --- reads --------------------------------------------------------------
async find(id: string): Promise<EntryWithBatch | null> {
const entry = await this.entries.findOne({ id }, { projection: { _id: 0 } });
if (entry === null) return null;
const batch = await this.batch(entry.batchId);
return { ...entry, batch };
}
async get(query: EntryQuery): Promise<Page<Entry>> {
const filter = this.toFilter(query);
const limit = query.limit ?? 50;
// Keyset pagination: a cursor encodes a (createdAt, id) position; return
// entries strictly OLDER than it. Resuming from a pruned position still
// works because we compare positions, not row offsets.
const cursor = query.cursor === undefined ? null : decodeCursor(query.cursor);
if (cursor !== null) {
const at = new Date(cursor.createdAt);
filter.$or = [
{ createdAt: { $lt: at } },
{ createdAt: at, id: { $lt: cursor.id } },
];
}
const projection = query.omitContent === true
? { _id: 0, content: 0 } // skip the big column for aggregate scans
: { _id: 0 };
// Fetch limit + 1 to learn whether a next page exists.
const rows = await this.entries
.find(filter, { projection })
.sort({ createdAt: -1, id: -1 })
.limit(limit + 1)
.toArray();
const hasMore = rows.length > limit;
const data = (hasMore ? rows.slice(0, limit) : rows).map((row) =>
query.omitContent === true ? { ...row, content: null } : row,
);
const last = data.at(-1);
const nextCursor =
hasMore && last !== undefined
? encodeCursor(last.createdAt.getTime(), last.id)
: null;
return { data, nextCursor };
}
async batch(batchId: string): Promise<Entry[]> {
return this.entries
.find({ batchId }, { projection: { _id: 0 } })
.sort({ sequence: 1 })
.toArray();
}
async tags(prefix?: string): Promise<TagCount[]> {
const match = prefix === undefined ? {} : { tags: { $regex: `^${prefix}` } };
const rows = await this.entries
.aggregate<TagCount>([
{ $match: match },
{ $unwind: '$tags' },
...(prefix === undefined
? []
: [{ $match: { tags: { $regex: `^${prefix}` } } }]),
{ $group: { _id: '$tags', count: { $sum: 1 } } },
{ $project: { _id: 0, tag: '$_id', count: 1 } },
])
.toArray();
return rows;
}
// --- retention ----------------------------------------------------------
async prune(olderThan: Date, keepLast?: number): Promise<number> {
// keepLast: never drop the newest N entries even if older than the cutoff.
if (keepLast !== undefined && keepLast > 0) {
const survivors = await this.entries
.find({}, { projection: { id: 1, _id: 0 } })
.sort({ createdAt: -1, id: -1 })
.limit(keepLast)
.toArray();
const keepIds = survivors.map((row) => row.id);
const { deletedCount } = await this.entries.deleteMany({
createdAt: { $lt: olderThan },
id: { $nin: keepIds },
});
return deletedCount;
}
const { deletedCount } = await this.entries.deleteMany({
createdAt: { $lt: olderThan },
});
return deletedCount;
}
async clear(): Promise<void> {
await this.entries.deleteMany({});
}
// --- query translation --------------------------------------------------
private toFilter(query: EntryQuery): Filter<Entry> {
const filter: Filter<Entry> = {};
if (query.type !== undefined) filter.type = query.type;
if (query.tag !== undefined) filter.tags = query.tag;
if (query.familyHash !== undefined) filter.familyHash = query.familyHash;
if (query.batchId !== undefined) filter.batchId = query.batchId;
if (query.traceId !== undefined) filter.traceId = query.traceId;
if (query.ids !== undefined) filter.id = { $in: query.ids };
if (query.before !== undefined || query.after !== undefined) {
filter.createdAt = {
...(query.before !== undefined ? { $lt: query.before } : {}),
...(query.after !== undefined ? { $gte: query.after } : {}),
};
}
if (query.search !== undefined) {
// Case-insensitive substring over serialized content (a request matches
// by uri, a query by sql, an exception by message, ...).
filter['content'] = { $regex: query.search, $options: 'i' };
}
return filter;
}
}Wire it like any adapter:
TelescopeModule.forRoot({
storage: new MongoStorageProvider(process.env.MONGO_URL),
});init() is awaited at boot before any other method; close() runs once at shutdown after the final flush. You call neither.
How it works — the load-bearing lines
get()is the whole adapter. Every list screen, type tab, and free-text search funnels through it. SortcreatedAt DESC, id DESCand honor the cursor exactly as shown.encodeCursor/decodeCursorare exported from core — reuse them so your cursor is the same opaque(createdAtMs, id)token every other adapter produces. Don't invent your own format.- Fetch
limit + 1. That extra row is how you decide whether to emit anextCursor. Emitnullwhen there's no next page so the UI stops paging. omitContentlets aggregate scans (pulse, timeseries) skip the largecontentcolumn. Project it out where your driver can; whatever happens, setcontent: nullon the returned rows so callers can never accidentally depend on it.idsis for batched hydration — one query for N ids instead of Nfind()round-trips. An empty array must return no entries ($in: []does this for free).update()must treatidas immutable — strip it from the patch (the request entry is patched in place when it completes).
The keyset-pagination contract
get() returns entries strictly older than the cursor position, sorted newest-first — never an offset/skip. The contract is explicit: if the cursor's original entry was pruned out from under the cursor, pagination must resume from that position, not return an empty page. Comparing the (createdAt, id) pair (as the $or above does) satisfies this for free; a row-offset skip does not, and would silently drop or repeat entries as the pruner runs. An undecodable cursor is silently ignored and paging starts from the first page — decodeCursor returns null, which the example treats as "no cursor".
Advanced — pre-aggregated rollups
get() alone is enough; pulse and timeseries fall back to a raw scan. If your store can aggregate cheaply (most can), additionally implement the optional RollupStore SPI from rollup-store.ts. It's detected at runtime via duck-typing (isRollupStore) — it is deliberately not part of StorageProvider, so adding it is purely additive and a store that omits it keeps working.
import {
type RollupBucket,
type RollupDelta,
type RollupStore,
HISTOGRAM_BUCKET_COUNT,
} from '@dudousxd/nestjs-telescope';
export class MongoStorageProvider implements StorageProvider, RollupStore {
// ...everything above, plus:
private rollups!: Collection<RollupBucket>;
// (create it in init(): db.collection('telescope_rollups'),
// unique index on { metric: 1, bucketStart: 1 })
// Additively merge deltas: accumulate count/sum, keep a running max, and
// element-wise add the fixed-length latency histogram. Idempotent per
// (metric, bucketStart) via upsert.
async recordRollups(deltas: RollupDelta[]): Promise<void> {
for (const delta of deltas) {
const histInc: Record<string, number> = {};
for (let i = 0; i < HISTOGRAM_BUCKET_COUNT; i += 1) {
histInc[`histogram.${i}`] = delta.histogram[i] ?? 0;
}
await this.rollups.updateOne(
{ metric: delta.metric, bucketStart: delta.bucketStart },
{
$inc: { count: delta.count, sum: delta.sum, ...histInc },
$max: { max: delta.max },
},
{ upsert: true },
);
}
}
async queryRollups(
metrics: string[],
fromBucket: number,
toBucket: number,
): Promise<RollupBucket[]> {
return this.rollups
.find({
metric: { $in: metrics },
bucketStart: { $gte: fromBucket, $lte: toBucket },
})
.project<RollupBucket>({ _id: 0 })
.toArray();
}
}Rollup buckets are 1-minute granularity (ROLLUP_BUCKET_MS) and each carries a fixed-length latency histogram (HISTOGRAM_BUCKET_COUNT cells). recordRollups must be additive and idempotent — the recorder may replay the same (metric, bucketStart) delta, so use $inc/$max upserts, never a blind overwrite.
Recipes
Copy-pasteable cookbook for extending Telescope — a custom storage adapter, custom watchers, dashboard login, custom tags and redaction, request-context capture, archiving to S3 before prune, reporting frontend errors, and AI exception diagnosis.
Custom watcher
Capture a source Telescope doesn't ship a watcher for — a tiny WebSocket-events watcher built on ctx.record(), and the real instrument(emit, ctx) escape hatch for a bespoke cache, both correlated to the request that caused them.