CRDT Columns
CRDT(Conflict-free Replicated Data Type)column は、2 つのクライアントが同じ行に並行書き込みし、ロスレスにマージすることを可能にします。plasma は 4 つを提供します: grow-only カウンター、符号付きカウンター、last-writer-wins レジスター、observed-remove 集合です。
なぜ CRDT か
Section titled “なぜ CRDT か”カウンターをインクリメントする 2 つのタブを考えます:
タブ A が counter = 5 を読み、counter = 6 に設定。タブ B が counter = 5 を読み、counter = 6 に設定。サーバーは 2 つの "6 に設定" 書き込みを見て、last-write-wins → counter = 6。実際: 両タブがインクリメントした; 正しい値は 7 であるべき。CRDT column はこれを、「現在の値」ではなく、サーバーが 2 つの並行貢献をロスレスにマージできる構造を格納することで修正します。crdtCounter は { clientA: 1, clientB: 1 } を格納し、sum = 2 として読み戻されます — サーバーは 2 つの貢献が到着した順序に関わらず、これに正しく収束します。
G-Counter — crdtCounter()
Section titled “G-Counter — crdtCounter()”Grow-only カウンター。各クライアントは自身のスロットを所有し、それを単調に増加させます。観測される値はすべてのスロットの合計です。
const teamStats = table("teamStats", { id: id(), completions: crdtCounter(),})sumCrdtCounter で読み取り:
const stats = useLiveQuery(() => db.select().from(teamStats), [])const total = sumCrdtCounter(stats[0]?.completions)crdtIncrement で書き込み:
export const mutators = defineMutators<typeof schema, Ctx>()({ logCompletion: async ({ db, args, clientID }) => { const rows = await db.select().from(teamStats).where(eq(teamStats.id, args.id)) const current = rows[0]?.completions as CrdtCounterMap | undefined const next = crdtIncrement(clientID!, 1, current) await db.update(teamStats).set({ completions: next }).where(eq(teamStats.id, args.id)) },})制約: crdtIncrement は正のデルタのみを受け取ります。デクリメントが必要なカウンターや両方向に振れるカウンターには crdtPnCounter が必要です。
PN-Counter — crdtPnCounter()
Section titled “PN-Counter — crdtPnCounter()”符号付きカウンター。内部的には 2 つの G-Counter です — 1 つは正の貢献用(p)、1 つは負の貢献用(n)。観測される値 = sum(p) - sum(n)。
const stats = table("stats", { id: id(), active: crdtPnCounter(),})pnRead で読み取り:
const total = pnRead(row.active)// 3 (これは p=5, n=2 でも p=4, n=1 でもよい — セマンティクスは気にしない)pnIncrement(正のデルタ)/ pnDecrement(正のデルタをデクリメントとして解釈)で書き込み:
completeTask: async ({ db, args, clientID }) => { const rows = await db.select().from(stats).where(eq(stats.id, "team")) const current = rows[0]?.active as PnCounterMap | undefined const next = pnDecrement(clientID!, 1, current) await db.update(stats).set({ active: next }).where(eq(stats.id, "team"))}
reopenTask: async ({ db, args, clientID }) => { const rows = await db.select().from(stats).where(eq(stats.id, "team")) const current = rows[0]?.active as PnCounterMap | undefined const next = pnIncrement(clientID!, 1, current) await db.update(stats).set({ active: next }).where(eq(stats.id, "team"))}ゼロデルタは no-op です(アロケーションのチャーンなし)。
LWW-Register — crdtLwwRegister<T>()
Section titled “LWW-Register — crdtLwwRegister<T>()”(ts, clientID) タイブレーク付きの last-writer-wins レジスター。「貢献がマージされるべき」ではなく「1 つのクライアントの値が勝つべき」フィールドに適しています — 例えばステータスラベル、担当者、ピン留めフラグ。
const tasks = table("tasks", { id: id(), status: crdtLwwRegister<"open" | "in-progress" | "done">(),})lwwRead で読み取り:
const status = lwwRead(row.status, "open") // 未設定時のフォールバックlwwSet で書き込み — プリミティブを純粋でモックしやすく保つため、タイムスタンプは自分で供給します:
setStatus: async ({ db, args, clientID }) => { const rows = await db.select().from(tasks).where(eq(tasks.id, args.id)) const current = rows[0]?.status as LwwRegister<string> | null | undefined const next = lwwSet(clientID!, args.status, Date.now(), current) await db.update(tasks).set({ status: next }).where(eq(tasks.id, args.id))}マージセマンティクス:
tsが大きい方が勝ちます。- ts が同点の場合、
clientIDが大きい方(辞書順)が勝ちます — 決定的なタイブレークなので、すべてのレプリカが同じ値に収束します。 null/ 未設定のレジスターはもう一方の側にマージされます。
OR-Set — crdtOrSet<T>()
Section titled “OR-Set — crdtOrSet<T>()”Observed-remove 集合。同じ要素の並行 add + remove は「add が勝つ」に解決されます。remove は観測された add タグをトゥームストーン化するだけだからです。
const messages = table("messages", { id: id(), reactions: crdtOrSet<string>(),})orSetValues で読み取り:
const emoji = orSetValues<string>(row.reactions)// ["👍", "❤️"]orSetAdd(clientID, seq, value, current) / orSetRemove(value, current) で書き込み:
addReaction: async ({ db, args, clientID, mutationID }) => { const rows = await db.select().from(messages).where(eq(messages.id, args.messageId)) const current = rows[0]?.reactions as OrSet<string> | undefined const next = orSetAdd(clientID!, mutationID!, args.emoji, current) await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))}
removeReaction: async ({ db, args }) => { const rows = await db.select().from(messages).where(eq(messages.id, args.messageId)) const current = rows[0]?.reactions as OrSet<string> | undefined const next = orSetRemove(args.emoji, current) await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))}すべての add に付く (clientID, seq) タグは、rebase 中の再実行でも同じタグを生成することを保証します — マージは冪等です。
「add が勝つ」性質
Section titled “「add が勝つ」性質”2 つのタブ、一方が 👍 を追加し、もう一方が 👍 を削除:
タブ A: 👍 を追加 → { adds: [tagA] }タブ B: 👍 を追加 → { adds: [tagB] }タブ A: 👍 を削除 → { adds: [tagA], tombstones: [tagA] } ↑ tagA をトゥームストーン化するが、tagB は未観測マージ: { adds: [tagA, tagB], tombstones: [tagA] }読み取り: ["👍"] (tagB が生き残る)メッセージ reaction に最適です: 「ユーザー A が like した」と「ユーザー B が自身の reaction を unlike した」の競合は、A の like を失いません。
ユーザーごとの reaction — key を正しくモデル化する
Section titled “ユーザーごとの reaction — key を正しくモデル化する”message の reaction で crdtOrSet<string> に絵文字をそのまま入れると、微妙な落とし穴があります:
// ❌ 脆い — orSetRemove("👍") は「👍」に一致する全 tag を削るので// 「A が unlike」で B の like も消える (merge で復元されるとはいえ、// 中間の optimistic 状態は誤り)。orSetRemove("👍", row.reactions)要素を user 付きの複合キーに モデル化するのが正解:
// ✅ ユーザー単位の reaction。A の unlike は A 自身の entry のみ削除。orSetAdd(clientID, mutationID, `${userId}:👍`, row.reactions)orSetRemove(`${userId}:👍`, row.reactions)
// 読み取り: userId prefix を落として絵文字だけ、または保持して// 「誰が何をリアクションしたか」を得る。const emoji = orSetValues<string>(row.reactions) .map((s) => s.split(":")[1])Add-wins は tag レベルで働くので、どの interleaving でも収束します。
plasma CRDT が扱わない領域
Section titled “plasma CRDT が扱わない領域”4 種類の column type は counter / register / set 状態をカバーします — per-user アプリで CRDT 形状の並行状態のほとんどはこれで足ります。
扱わない ものは:
- テキスト / リッチテキスト (Yjs / Automerge / Loro 形状)。共同編集される文書本体は OR-Set や LWW では表せず、RGA / YATA / Fugue 型の sequence CRDT が必要。plasma v1.0 は同梱していません。回避策は、serialize した Yjs ドキュメントを
file()/json()に opaque payload として持ち、編集のたび blob 丸ごとを sync する方法 — 収束はしますがコスト高く live-query との組み合わせも効きません。 - 順序ある sequence / list (プレイリスト / ドラッグで並べ替え可能なタスク)。OR-Set はメンバーシップだけで順序を持ちません。
- ツリー (nested outline / move 意味を持つフォルダ構造)。同じく in-CRDT 順序 primitive がないため対象外。
sequence / text CRDT は v1.1 候補としてキュー中 — Roadmap を参照。
自動サーバーマージ
Section titled “自動サーバーマージ”サーバーエンジンは push 時に CRDT column を自動的にマージします。CRDT column に resolveConflict を書く必要はありません。sql-engine.ts の mergeCrdtColumns 関数が、column の種類に基づいて mergeCrdtCounter / mergePnCounter / mergeLwwRegister / mergeOrSet にディスパッチします。
注意点: これは書き込み前に現在のサーバー行がフェッチされたときにのみ実行されます。sql-engine.ts は、書き込みが table のいずれかの CRDT column を触っているときにそれをフェッチします — executeUpdate の touchesCrdt が 4 種類すべてをカバーします。
知っておくべきセマンティクス
Section titled “知っておくべきセマンティクス”- 読み取り/書き込みの型の非対称性。 column の型は観測される値(例: カウンターでは
Column<number>)ですが、書き込みはストレージ形状(CrdtCounterMap、PnCounterMap、…)を渡さなければなりません。呼び出し側は.set({ counter: next as unknown as number })と書きます。この非対称性を内部化する v1.1 のdb.increment(col, delta)高レベル API に予定されています。 - column の種類の変更にはサーバー側の再作成が必要です。 column を
crdtCounterからcrdtPnCounterに切り替える(またはあらゆる種類の変更)ことはrunMigrationsに拒否されます。DROP TABLEして再 migrate するか、新しい名前の新しい table を出荷する必要があります。
次に読むべきもの
Section titled “次に読むべきもの”- Concepts / Optimistic vs Canonical — push 後に CRDT マージがクライアントにどう着地するか
- Conflict resolution — CRDT が状態の形状に合わないときの脱出ハッチ