コンテンツにスキップ

Mutators

mutator は、plasma を通じてデータを変更する 唯一 の方法です。plasma が提供するすべて — sync、retry、live queries、CRDT マージ、optimistic UI — は mutator の契約の上に成り立っています。

import { defineMutators, eq } from "@sh1n4ps/plasma-core"
import { schema, todos } from "./schema"
interface Ctx {
userId: string
}
export const mutators = defineMutators<typeof schema, Ctx>()({
markDone: async ({ db, args }) => {
await db.update(todos)
.set({ done: 1 })
.where(eq(todos.id, args.id))
},
})

defineMutators<S, Ctx>() は恒等関数を返します。重要な処理は型パラメータが担います:

  • S — schema の型。db.update(todos) に対して、todos が table であり、その行が宣言した形状を持つことを知らせます。
  • CtxgetContext()(クライアント)と auth().ctx(サーバー)の戻り値の型。

すべての mutator は、3 つの必須フィールド(dbargsctx)と 2 つのオプショナルな由来マーカー(clientIDmutationID)を持つ単一のオブジェクトを受け取ります:

async ({ db, args, ctx, clientID, mutationID }) => { ... }
  • db — mutator を呼び出したエンジンにバインドされた Db<S>。ブラウザでは IndexedDB エンジン、Worker では D1(または Postgres)エグゼキュータです。同じインターフェイス、異なるランタイムです。
  • args — 呼び出し側が client.mutate("name", args) の第 2 引数として渡したもの。plasma はこの形状を検査しません。JSON シリアライズ可能なものであれば何でも入れられます。
  • ctx — 現在のユーザーコンテキスト。クライアントでは各呼び出しの前に getContext() が呼ばれます。サーバーでは auth(req).ctx が渡されます。
  • clientID — タブごとの識別子。rebase 中の再実行では由来が記録されない場合があるためオプショナル(clientID?: string)ですが、通常の push/pull 中には plasma が常に埋めます。
  • mutationID — clientID ごとの、この mutation の単調増加 id。CRDT タグの構築に便利です(後述)。

args の型は mutator の 定義 箇所で捕捉されます:

export const mutators = defineMutators<typeof schema, Ctx>()({
createTodo: async ({
db,
args,
}: {
db: Db<typeof schema>
args: { id: string; title: string; updatedAt: number }
ctx: Ctx
}) => {
await db.insert(todos).values({ id: args.id, title: args.title, updatedAt: args.updatedAt })
},
})

クライアントの呼び出し側は自動的に型を受け取ります:

// TypeScript は args が { id: string; title: string; updatedAt: number } でなければならないと知っている
await client.mutate("createTodo", { id: "x", title: "hi", updatedAt: 0 })

ワイヤーを越えるペイロード(つまり実アプリでは常にそう)については、mutator をオブジェクト形式でラップすることで、ロジックが走る前に args schema を検証できます:

import * as v from "valibot"
export const mutators = defineMutators<typeof schema, Ctx>()({
createTodo: {
args: v.object({
id: v.string(),
title: v.pipe(v.string(), v.maxLength(200)),
updatedAt: v.number(),
}),
run: async ({ db, args }) => {
await db.insert(todos).values({ id: args.id, title: args.title, updatedAt: args.updatedAt })
},
},
})

plasma は Standard Schema v1 プロトコルを使うため、Zod、Valibot、ArkType、Effect Schema はすべてそのまま動作します。検証失敗は db の呼び出しが起こる前に MutatorValidationError を throw します — そして重要なことに、mutation ID は依然として進むため、古いクライアントが汚染されたペイロードのリトライで詰まることはありません。

受け取る db は、mutator のエンジンにスコープされた完全に型付けされたクエリビルダーです。トップレベルでできることはすべてここでもできます。

createTodo: async ({ db, args, ctx }) => {
await db.insert(todos).values({
id: args.id,
title: args.title,
userId: ctx.userId,
updatedAt: args.updatedAt,
})
}

InferInsertRow はデフォルトを持つ column を広げるため、done.default(0) を持つ)は省略できますが、title(デフォルトなし)は指定しなければなりません。

editTitle: async ({ db, args }) => {
await db.update(todos)
.set({ title: args.title, updatedAt: args.updatedAt })
.where(eq(todos.id, args.id))
},

set() は部分的な行を受け取ります。where()select() と同じ述語(eqnegtandorinArray など)をサポートします。

deleteTodo: async ({ db, args }) => {
await db.delete(todos).where(eq(todos.id, args.id))
}

書き込む前に現在の行を読む必要がある場合があります:

increment: async ({ db, args, clientID }) => {
const rows = await db.select().from(counters).where(eq(counters.id, args.id))
const current = rows[0]?.value as PnCounterMap | undefined
const next = pnIncrement(clientID!, args.delta, current)
await db.update(counters).set({ value: next }).where(eq(counters.id, args.id))
}

mutation に安定した識別子が必要な操作はこれらを使います:

addReaction: async ({ db, args, clientID, mutationID }) => {
const rows = await db.select().from(messages).where(eq(messages.id, args.messageId))
const current = rows[0]?.reactions as OrSet<string> | undefined
const next = orSetAdd(clientID!, mutationID!, args.emoji, current)
await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))
}

orSetAdd(clientID, seq, value, current) プリミティブはすべての追加を (clientID, seq) でタグ付けするため、rebase 中の再実行でも同じタグが生成されます — マージは冪等です。

1 つの mutator は任意の数の table に書き込めます。plasma はサーバーエンジン上で全体を 1 つのトランザクションで実行します:

completeAndAudit: async ({ db, args, ctx }) => {
await db.update(todos).set({ done: 1 }).where(eq(todos.id, args.id))
await db.insert(audit).values({
id: crypto.randomUUID(),
action: "complete",
targetId: args.id,
userId: ctx.userId,
at: Date.now(),
})
}

両方の変更は同じ created_at で change log に入ります。どちらかの table を監視している live query は、mutator 全体が完了すると通知を受け取ります。

  • クライアント側: mutate() が throw します(または返された Promise が reject します)。optimistic apply は中断されます。ステージされた outbox エントリは、mutator が db 書き込みが着地する前に throw した場合は削除されます。.catch は元のエラーを受け取ります。
  • サーバー側: mutator が throw する → トランザクションがロールバックする → _plasma_client_mutations.last_mutation_id は依然として進むため、汚染された mutation が永遠にリトライされることはない → push レスポンスはワイヤー上では依然として { ok: true } です。クライアントは次の pull でドロップを知ります: lastMutationIDs はインクリメントされたウォーターマークを報告しますが、change log には対応する行がないため、rebuildOptimistic が行を視覚的に取り消します。

SyncClientErrormutation-error の種類は ありません。mutator がクライアントの予測できる理由(クォータ超過、無効な状態、削除済みアイテム)で throw しうるなら、client.mutate を呼ぶ前に事前チェックしてください — 取り消しの UX(行が視覚的に表示され、次の pull で消える)は実際に起こり、事前チェックなしには避けられません。

そのようなものはありません。読み取りが必要なら、db.select() を直接使ってください(クライアントでは useLiveQuery 経由、Worker では createServerDb 経由)。mutator は write チャネルです。

知っておくべきセマンティクス

Section titled “知っておくべきセマンティクス”

知っていれば安全だが、知らなければ落とし穴になる 2 つの挙動です:

  • runMutate は outbox エントリをエンキューする 前に mutator を呼び出します。 複数の table に書き込んでから throw する mutator は、対応する outbox エントリなしにそれらの書き込みを IDB に残します。次の rebuildOptimistic はまだ throw していないベース状態を復元します(正味の効果: 部分的な書き込みは次の pull で静かに破棄される)。mutator が部分的に失敗しうるなら、リプレイ時にクリーンに再実行できるよう、書き込みをロジックガードでラップしてください。
  • suppressed table + 混在タッチの mutator。 synced table と changeLogSuppressed: true の table の両方に書き込む mutator は outbox エントリをキューします(正しい — synced な書き込みはサーバーに届く必要がある)。rebuildOptimistic では outbox がリプレイされるため、mutator は両方の table への書き込みを再実行します。固定値の挿入は冪等ですが、suppressed table の行への pnIncrement(current, delta) は二重適用されます。suppressed table への書き込みは固定値にするか、バージョンチェックでガードしてください。

どちらの挙動も CHANGELOG の “Semantics worth knowing” セクションで扱われています。

  • Auth and permissions — 誰がどの mutator をどの行に対して呼び出せるかを制限する
  • CRDT columns — counter / register / set 形状の状態の自動収束
  • Testing — 単体テストのために mutator を fake-IndexedDB ハーネスに対して実行する方法