Server-Side Operations
plasma’s headline story is the browser + Worker sync loop, but the server ships several APIs that only make sense from server-side code: admin dashboards, scheduled compactors, historical snapshots, and DO poke helpers. This guide walks through all of them.
serverLiveSelect — reactive queries from server code
Section titled “serverLiveSelect — reactive queries from server code”Want an admin dashboard that shows “here’s every organisation’s
todo count, updated live”? The browser side’s useLiveQuery isn’t
the tool — it runs against IDB, not D1. Reach for
serverLiveSelect:
import { fromD1, serverLiveSelect } from "@sh1n4ps/plasma-server"
const executor = fromD1(env.DB)
const handle = serverLiveSelect({ executor, tables: ["todos", "users"], // fires only when these tables change pollIntervalMs: 1000, // heartbeat interval
fetch: async () => { // Any SQL you want — this bypasses auth.read, so it's for // admin / cross-tenant visibility only. const rows = await executor.all({ text: "SELECT userId, COUNT(*) AS n FROM todos GROUP BY userId", params: [], }) return rows },
onChange: (rows) => { broadcastToAdmins(rows) // push over WebSocket / SSE / whatever },})
// laterhandle.stop()Key properties:
- Polls the change log. When any row in
tablesgets a newrow_version, the runner re-invokesfetchand emits viaonChange. tablesfilter is optional. Omit it to react to every writable table.pollIntervalMsis a heartbeat too. Even in quiet periods thefetchruns so callers can spot lag or the runner is alive. Default 1000ms.- Bypasses auth. This is for admin / server-only paths. Do
NOT expose the result verbatim to end users unless you re-apply
the row auth predicates in
fetch. - One polling loop per subscriber. For fan-out at scale,
reach for
SyncCoordinator+PlasmaClienton each subscriber instead.serverLiveSelecttrades scale for simplicity.
snapshotAsOf — replay a past state
Section titled “snapshotAsOf — replay a past state”Rebuild what any table looked like at an earlier point in time:
import { snapshotAsOf } from "@sh1n4ps/plasma-server"
const past = await snapshotAsOf({ schema, executor: fromD1(env.DB), cookie: "12345", // a row_version cursor (as string or number)})
// past is a Map<tableName, Map<rowId, row>>const todosThen = past.get("todos")for (const [id, row] of todosThen ?? []) { console.log(id, row)}How it works: replays every _plasma_changes entry with
row_version <= cookie. Rows deleted before cookie are absent
from the result; rows updated after cookie reflect their
earlier value.
Cost: O(number of change log entries). For a table with 10M
writes this is measurable — reach for compactChangeLog if the
audit window doesn’t need every intermediate state.
Use cases:
- Audit dashboards (“show the state 30 days ago”)
- Compliance replays (“what was the record on 2026-01-15?”)
- “Compare with a week ago” UIs
- Debugging: “when did this row change from X to Y?”
compactChangeLog — trim the change log
Section titled “compactChangeLog — trim the change log”Every write adds a row to _plasma_changes. Over time the log
grows without bound and pulls get slow. compactChangeLog folds
intermediate states down to just the latest, per
(table_name, row_id):
import { compactChangeLog } from "@sh1n4ps/plasma-server"
const result = await compactChangeLog({ executor: fromD1(env.DB), safeUpToVersion: 5000000, // clients whose cookies < this see the folded state})
console.log(`Removed ${result.removed} intermediate versions, kept ${result.kept} rows`)Correctness contract:
- Clients whose pull cookie is at or beyond
safeUpToVersionare unaffected — they’d have skipped past those rows anyway. - Clients whose cookie is below
safeUpToVersionsee the row jump directly to the folded state. They miss the intermediate transitions but still converge to the truth. snapshotAsOf(cookie < safeUpToVersion)becomes less accurate — the intermediate history is gone.
Where to pick safeUpToVersion:
- Ideal:
min(cookie across live clients). If you can enumerate active pull cookies, use their minimum. - Practical: leave a generous margin. “60 days ago’s max row_version” is a safe heuristic — no realistic client is 60 days stale.
Run from a scheduled Worker (weekly is common):
export default { async scheduled(_event, env, ctx) { const executor = fromD1(env.DB) ctx.waitUntil(compactChangeLog({ executor, safeUpToVersion: await computeSafeCursor(executor), })) },}gcOrphanedBlobs — free R2 storage
Section titled “gcOrphanedBlobs — free R2 storage”Deleting a row with a file() column drops the reference count.
When the reference count is zero, the blob becomes eligible for
GC — but with a grace window so re-attach flows can hit the
cache:
import { gcOrphanedBlobs, r2Storage } from "@sh1n4ps/plasma-server"
const result = await gcOrphanedBlobs({ executor: fromD1(env.DB), storage: r2Storage({ bucket: env.BUCKET }), minOrphanAgeMs: 7 * 24 * 3600 * 1000, // 7 day grace limit: 500, // per invocation cap})
console.log(`Deleted ${result.deleted} blobs, ${result.remaining} more waiting`)Full detail on the blob lifecycle is in Files and Blobs.
reconcileBlobRefs — rebuild after raw DDL
Section titled “reconcileBlobRefs — rebuild after raw DDL”If your admin scripts write to user tables directly (not through
createSyncHandler), the _plasma_blob_refs counter drifts. Fix
it:
import { reconcileBlobRefs } from "@sh1n4ps/plasma-server"
await reconcileBlobRefs({ schema, executor: fromD1(env.DB), dialect: sqliteDialect,})Rare in practice. Keep it in the toolbox for post-migration sanity or after a bulk import.
pokeCoordinator — trigger a WebSocket poke from server code
Section titled “pokeCoordinator — trigger a WebSocket poke from server code”The SyncCoordinator DO fans out poke messages to every
connected WebSocket in a room. pokeCoordinator is how you
trigger a poke from server-side code without going through the
DO stub yourself:
import { pokeCoordinator } from "@sh1n4ps/plasma-server/coordinator"
// From your sync handler, cron job, external webhook, etc.await pokeCoordinator(env.COORDINATOR, { room: `group-${clientGroupID}`, token: env.POKE_TOKEN,})Options:
room— the room to poke. Default"global". Match the scheme the client uses (clientGroupIDby default; per-doc keys if you overrode).token— required. Guards against random processes on your Worker network being able to trigger pokes. Configure a shared secret and check it inside a custom coordinator subclass if you’re paranoid.
Usage patterns:
- From
createSyncHandler: plasma pokes automatically after a successful push. You don’t need to callpokeCoordinatormanually for the sync loop. - From a scheduled Worker: after a batch import or cron update, poke the affected room so live tabs pull sooner than the default 5s.
- From an external webhook: your third-party service triggered a change; you want plasma clients to know without waiting for the poll timer.
createServerDb — the query builder outside the sync handler
Section titled “createServerDb — the query builder outside the sync handler”createServerDb gives you the same Db<Schema> builder the
mutator sees, but for use outside a mutator — cron jobs, admin
scripts, background aggregators:
import { createServerDb, fromD1 } from "@sh1n4ps/plasma-server"
const db = createServerDb({ schema, executor: fromD1(env.DB), ctx: { userId: "admin", role: "admin" }, // auth applies (see below)})
const todos = await db.select().from(todos).where(...)Auth behaviour:
- If you declared
TableOptions.auth: { read, write }on the table,createServerDbruns those predicates against thectxyou supplied. - To bypass auth for admin work, pass a permissive predicate in
ctx (
{ userId: "*", role: "admin" }+ adjust yourauth.readto short-circuit onrole === "admin").
invokeMutator — server-side mutator run
Section titled “invokeMutator — server-side mutator run”Call a mutator by name from server code (cron, webhook, admin tool):
import { invokeMutator } from "@sh1n4ps/plasma-core"
await invokeMutator(mutators, "markDone", { db: createServerDb({ schema, executor, ctx: { userId: "system" } }), args: { id: "t1", updatedAt: Date.now() }, ctx: { userId: "system" }, clientID: "system", mutationID: 0,})Writes land in _plasma_changes and get delivered to clients on
their next pull, just like a mutation that came from the wire.
Use cases:
- Cron jobs that update rows on a schedule (mark stale todos as done, expire trials, etc.).
- Webhooks from external services.
- Admin operations you want on the audit trail.
Which server-side API for what
Section titled “Which server-side API for what”| Task | Reach for |
|---|---|
| Admin dashboard live view | serverLiveSelect |
| Historical / audit replay | snapshotAsOf |
| Change log growing out of hand | compactChangeLog |
| R2 disk usage growing | gcOrphanedBlobs |
| Blob refs drift after DDL | reconcileBlobRefs |
| Push a WebSocket poke by hand | pokeCoordinator |
| Run the query builder from a cron | createServerDb |
| Run a mutator from a cron | invokeMutator |
What to read next
Section titled “What to read next”- Deployment — where to wire the
scheduledhandler for the cron-shaped APIs - Files and Blobs — full blob GC lifecycle
- Migrations —
runMigrationsfor schema changes - Presence — the other coordinator-adjacent API