Skip to content

Server-Side Operations

plasma’s headline story is the browser + Worker sync loop, but the server ships several APIs that only make sense from server-side code: admin dashboards, scheduled compactors, historical snapshots, and DO poke helpers. This guide walks through all of them.

serverLiveSelect — reactive queries from server code

Section titled “serverLiveSelect — reactive queries from server code”

Want an admin dashboard that shows “here’s every organisation’s todo count, updated live”? The browser side’s useLiveQuery isn’t the tool — it runs against IDB, not D1. Reach for serverLiveSelect:

import { fromD1, serverLiveSelect } from "@sh1n4ps/plasma-server"
const executor = fromD1(env.DB)
const handle = serverLiveSelect({
executor,
tables: ["todos", "users"], // fires only when these tables change
pollIntervalMs: 1000, // heartbeat interval
fetch: async () => {
// Any SQL you want — this bypasses auth.read, so it's for
// admin / cross-tenant visibility only.
const rows = await executor.all({
text: "SELECT userId, COUNT(*) AS n FROM todos GROUP BY userId",
params: [],
})
return rows
},
onChange: (rows) => {
broadcastToAdmins(rows) // push over WebSocket / SSE / whatever
},
})
// later
handle.stop()

Key properties:

  • Polls the change log. When any row in tables gets a new row_version, the runner re-invokes fetch and emits via onChange.
  • tables filter is optional. Omit it to react to every writable table.
  • pollIntervalMs is a heartbeat too. Even in quiet periods the fetch runs so callers can spot lag or the runner is alive. Default 1000ms.
  • Bypasses auth. This is for admin / server-only paths. Do NOT expose the result verbatim to end users unless you re-apply the row auth predicates in fetch.
  • One polling loop per subscriber. For fan-out at scale, reach for SyncCoordinator + PlasmaClient on each subscriber instead. serverLiveSelect trades scale for simplicity.

Rebuild what any table looked like at an earlier point in time:

import { snapshotAsOf } from "@sh1n4ps/plasma-server"
const past = await snapshotAsOf({
schema,
executor: fromD1(env.DB),
cookie: "12345", // a row_version cursor (as string or number)
})
// past is a Map<tableName, Map<rowId, row>>
const todosThen = past.get("todos")
for (const [id, row] of todosThen ?? []) {
console.log(id, row)
}

How it works: replays every _plasma_changes entry with row_version <= cookie. Rows deleted before cookie are absent from the result; rows updated after cookie reflect their earlier value.

Cost: O(number of change log entries). For a table with 10M writes this is measurable — reach for compactChangeLog if the audit window doesn’t need every intermediate state.

Use cases:

  • Audit dashboards (“show the state 30 days ago”)
  • Compliance replays (“what was the record on 2026-01-15?”)
  • “Compare with a week ago” UIs
  • Debugging: “when did this row change from X to Y?”

Every write adds a row to _plasma_changes. Over time the log grows without bound and pulls get slow. compactChangeLog folds intermediate states down to just the latest, per (table_name, row_id):

import { compactChangeLog } from "@sh1n4ps/plasma-server"
const result = await compactChangeLog({
executor: fromD1(env.DB),
safeUpToVersion: 5000000, // clients whose cookies < this see the folded state
})
console.log(`Removed ${result.removed} intermediate versions, kept ${result.kept} rows`)

Correctness contract:

  • Clients whose pull cookie is at or beyond safeUpToVersion are unaffected — they’d have skipped past those rows anyway.
  • Clients whose cookie is below safeUpToVersion see the row jump directly to the folded state. They miss the intermediate transitions but still converge to the truth.
  • snapshotAsOf(cookie < safeUpToVersion) becomes less accurate — the intermediate history is gone.

Where to pick safeUpToVersion:

  • Ideal: min(cookie across live clients). If you can enumerate active pull cookies, use their minimum.
  • Practical: leave a generous margin. “60 days ago’s max row_version” is a safe heuristic — no realistic client is 60 days stale.

Run from a scheduled Worker (weekly is common):

export default {
async scheduled(_event, env, ctx) {
const executor = fromD1(env.DB)
ctx.waitUntil(compactChangeLog({
executor,
safeUpToVersion: await computeSafeCursor(executor),
}))
},
}

Deleting a row with a file() column drops the reference count. When the reference count is zero, the blob becomes eligible for GC — but with a grace window so re-attach flows can hit the cache:

import { gcOrphanedBlobs, r2Storage } from "@sh1n4ps/plasma-server"
const result = await gcOrphanedBlobs({
executor: fromD1(env.DB),
storage: r2Storage({ bucket: env.BUCKET }),
minOrphanAgeMs: 7 * 24 * 3600 * 1000, // 7 day grace
limit: 500, // per invocation cap
})
console.log(`Deleted ${result.deleted} blobs, ${result.remaining} more waiting`)

Full detail on the blob lifecycle is in Files and Blobs.

reconcileBlobRefs — rebuild after raw DDL

Section titled “reconcileBlobRefs — rebuild after raw DDL”

If your admin scripts write to user tables directly (not through createSyncHandler), the _plasma_blob_refs counter drifts. Fix it:

import { reconcileBlobRefs } from "@sh1n4ps/plasma-server"
await reconcileBlobRefs({
schema,
executor: fromD1(env.DB),
dialect: sqliteDialect,
})

Rare in practice. Keep it in the toolbox for post-migration sanity or after a bulk import.

pokeCoordinator — trigger a WebSocket poke from server code

Section titled “pokeCoordinator — trigger a WebSocket poke from server code”

The SyncCoordinator DO fans out poke messages to every connected WebSocket in a room. pokeCoordinator is how you trigger a poke from server-side code without going through the DO stub yourself:

import { pokeCoordinator } from "@sh1n4ps/plasma-server/coordinator"
// From your sync handler, cron job, external webhook, etc.
await pokeCoordinator(env.COORDINATOR, {
room: `group-${clientGroupID}`,
token: env.POKE_TOKEN,
})

Options:

  • room — the room to poke. Default "global". Match the scheme the client uses (clientGroupID by default; per-doc keys if you overrode).
  • token — required. Guards against random processes on your Worker network being able to trigger pokes. Configure a shared secret and check it inside a custom coordinator subclass if you’re paranoid.

Usage patterns:

  • From createSyncHandler: plasma pokes automatically after a successful push. You don’t need to call pokeCoordinator manually for the sync loop.
  • From a scheduled Worker: after a batch import or cron update, poke the affected room so live tabs pull sooner than the default 5s.
  • From an external webhook: your third-party service triggered a change; you want plasma clients to know without waiting for the poll timer.

createServerDb — the query builder outside the sync handler

Section titled “createServerDb — the query builder outside the sync handler”

createServerDb gives you the same Db<Schema> builder the mutator sees, but for use outside a mutator — cron jobs, admin scripts, background aggregators:

import { createServerDb, fromD1 } from "@sh1n4ps/plasma-server"
const db = createServerDb({
schema,
executor: fromD1(env.DB),
ctx: { userId: "admin", role: "admin" }, // auth applies (see below)
})
const todos = await db.select().from(todos).where(...)

Auth behaviour:

  • If you declared TableOptions.auth: { read, write } on the table, createServerDb runs those predicates against the ctx you supplied.
  • To bypass auth for admin work, pass a permissive predicate in ctx ({ userId: "*", role: "admin" } + adjust your auth.read to short-circuit on role === "admin").

Call a mutator by name from server code (cron, webhook, admin tool):

import { invokeMutator } from "@sh1n4ps/plasma-core"
await invokeMutator(mutators, "markDone", {
db: createServerDb({ schema, executor, ctx: { userId: "system" } }),
args: { id: "t1", updatedAt: Date.now() },
ctx: { userId: "system" },
clientID: "system",
mutationID: 0,
})

Writes land in _plasma_changes and get delivered to clients on their next pull, just like a mutation that came from the wire.

Use cases:

  • Cron jobs that update rows on a schedule (mark stale todos as done, expire trials, etc.).
  • Webhooks from external services.
  • Admin operations you want on the audit trail.
Task Reach for
Admin dashboard live view serverLiveSelect
Historical / audit replay snapshotAsOf
Change log growing out of hand compactChangeLog
R2 disk usage growing gcOrphanedBlobs
Blob refs drift after DDL reconcileBlobRefs
Push a WebSocket poke by hand pokeCoordinator
Run the query builder from a cron createServerDb
Run a mutator from a cron invokeMutator
  • Deployment — where to wire the scheduled handler for the cron-shaped APIs
  • Files and Blobs — full blob GC lifecycle
  • MigrationsrunMigrations for schema changes
  • Presence — the other coordinator-adjacent API