Skip to content

Mutators

Mutators are the only way to change data through plasma. Everything plasma provides — sync, retry, live queries, CRDT merges, optimistic UI — hangs off the mutator contract.

import { defineMutators, eq } from "@sh1n4ps/plasma-core"
import { schema, todos } from "./schema"
interface Ctx {
userId: string
}
export const mutators = defineMutators<typeof schema, Ctx>()({
markDone: async ({ db, args }) => {
await db.update(todos)
.set({ done: 1 })
.where(eq(todos.id, args.id))
},
})

defineMutators<S, Ctx>() returns an identity function. The type parameters do the heavy lifting:

  • S — your schema type. Makes db.update(todos) know that todos is a table and that its rows have the shape you declared.
  • Ctx — the type of getContext() (client) and auth().ctx (server) return value.

Every mutator receives a single object with three required fields (db, args, ctx) and two optional origin markers (clientID, mutationID):

async ({ db, args, ctx, clientID, mutationID }) => { ... }
  • db — a Db<S> bound to whichever engine invoked the mutator. On the browser: the IndexedDB engine. On the Worker: the D1 (or Postgres) executor. Same interface, different runtime.
  • args — whatever the caller passed as the second argument to client.mutate("name", args). plasma doesn’t inspect this shape; you can put anything JSON-serialisable in.
  • ctx — the current user context. On the client, getContext() is called before every invocation. On the server, auth(req).ctx is passed in.
  • clientID — the per-tab identifier. Optional (clientID?: string) because a mutator might be replayed during rebase where the origin isn’t recorded, but plasma always populates it during normal push/pull.
  • mutationID — the monotonic per-clientID id of this mutation. Useful for CRDT tag construction (see below).

The type of args is captured at the mutator’s definition site:

export const mutators = defineMutators<typeof schema, Ctx>()({
createTodo: async ({
db,
args,
}: {
db: Db<typeof schema>
args: { id: string; title: string; updatedAt: number }
ctx: Ctx
}) => {
await db.insert(todos).values({ id: args.id, title: args.title, updatedAt: args.updatedAt })
},
})

Client callers pick up the type automatically:

// TypeScript knows args must be { id: string; title: string; updatedAt: number }
await client.mutate("createTodo", { id: "x", title: "hi", updatedAt: 0 })

For payloads that cross the wire (i.e. always in real apps), wrap the mutator in the object form so the args schema validates before your logic runs:

import * as v from "valibot"
export const mutators = defineMutators<typeof schema, Ctx>()({
createTodo: {
args: v.object({
id: v.string(),
title: v.pipe(v.string(), v.maxLength(200)),
updatedAt: v.number(),
}),
run: async ({ db, args }) => {
await db.insert(todos).values({ id: args.id, title: args.title, updatedAt: args.updatedAt })
},
},
})

plasma uses the Standard Schema v1 protocol, so Zod, Valibot, ArkType, and Effect Schema all work unchanged. A validation failure throws MutatorValidationError before any db call happens — and, crucially, the mutation ID still advances so a stale client isn’t stuck retrying a poisoned payload.

The db you receive is a fully-typed query builder scoped to the mutator’s engine. Everything you can do at the top level, you can do here.

createTodo: async ({ db, args, ctx }) => {
await db.insert(todos).values({
id: args.id,
title: args.title,
userId: ctx.userId,
updatedAt: args.updatedAt,
})
}

InferInsertRow widens columns with defaults, so you can omit done (has .default(0)) but must supply title (no default).

editTitle: async ({ db, args }) => {
await db.update(todos)
.set({ title: args.title, updatedAt: args.updatedAt })
.where(eq(todos.id, args.id))
},

set() accepts a partial row. where() supports the same predicates as select() (eq, ne, gt, and, or, inArray, etc.).

deleteTodo: async ({ db, args }) => {
await db.delete(todos).where(eq(todos.id, args.id))
}

Sometimes you need to read the current row before writing:

increment: async ({ db, args, clientID }) => {
const rows = await db.select().from(counters).where(eq(counters.id, args.id))
const current = rows[0]?.value as PnCounterMap | undefined
const next = pnIncrement(clientID!, args.delta, current)
await db.update(counters).set({ value: next }).where(eq(counters.id, args.id))
}

Any operation that needs a stable identity for the mutation reaches for these:

addReaction: async ({ db, args, clientID, mutationID }) => {
const rows = await db.select().from(messages).where(eq(messages.id, args.messageId))
const current = rows[0]?.reactions as OrSet<string> | undefined
const next = orSetAdd(clientID!, mutationID!, args.emoji, current)
await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))
}

The orSetAdd(clientID, seq, value, current) primitive tags every addition with (clientID, seq) so re-runs during rebase produce the same tag — the merge is idempotent.

A single mutator can write to any number of tables. plasma runs the whole thing in one transaction on the server engine:

completeAndAudit: async ({ db, args, ctx }) => {
await db.update(todos).set({ done: 1 }).where(eq(todos.id, args.id))
await db.insert(audit).values({
id: crypto.randomUUID(),
action: "complete",
targetId: args.id,
userId: ctx.userId,
at: Date.now(),
})
}

Both changes land in the change log at the same created_at. Live queries watching either table receive their notification once the whole mutator finishes.

  • On the client: mutate() throws (or the returned Promise rejects). The optimistic apply is aborted; the outbox entry that was staged is removed if the mutator threw before any db write landed. Your .catch sees the original error.
  • On the server: the mutator throws → transaction rolls back → _plasma_client_mutations.last_mutation_id still advances so the poison mutation isn’t retried forever → the push response is still { ok: true } on the wire. The client learns about the drop on the next pull: lastMutationIDs reports the bumped watermark but the change log has no matching row, so rebuildOptimistic reverts the row visually.

There is no mutation-error kind in SyncClientError. If your mutator can throw for reasons the client can predict (quota exceeded, invalid state, item deleted), pre-check before calling client.mutate — the reversion UX (the row visually appears then disappears on the next pull) is real and unavoidable without a pre-check.

There’s no such thing. If you need to read, use db.select() directly (via useLiveQuery on the client, or createServerDb on the Worker). Mutators are the write channel.

Two behaviours that are safe if you know them, subtle if you don’t:

  • runMutate invokes the mutator before enqueueing the outbox entry. A mutator that writes to multiple tables and then throws leaves those writes visible in IDB with no matching outbox entry; the next rebuildOptimistic restores the un-thrown-yet base state (net effect: partial write is silently discarded on next pull). If your mutator can partially fail, wrap the writes in a logic guard so it re-runs cleanly on replay.
  • Suppressed tables + mixed-touch mutators. A mutator that writes to both a synced table and a changeLogSuppressed: true table queues an outbox entry (correct — the synced write must reach the server). On rebuildOptimistic the outbox replays, so the mutator re-runs its writes to both tables. Fixed-value inserts are idempotent; a pnIncrement(current, delta) on the suppressed table’s row would double-apply. Keep suppressed-table writes fixed-value or gate them on a version check.

Both behaviours are covered in the CHANGELOG’s “Semantics worth knowing” section.

  • Auth and permissions — restrict who can invoke which mutators against which rows
  • CRDT columns — automatic convergence for counter / register / set-shaped state
  • Testing — how to run mutators against a fake-IndexedDB harness for unit tests