コンテンツにスキップ

CRDT Columns

CRDT(Conflict-free Replicated Data Type)column は、2 つのクライアントが同じ行に並行書き込みし、ロスレスにマージすることを可能にします。plasma は 4 つを提供します: grow-only カウンター、符号付きカウンター、last-writer-wins レジスター、observed-remove 集合です。

カウンターをインクリメントする 2 つのタブを考えます:

タブ A が counter = 5 を読み、counter = 6 に設定。
タブ B が counter = 5 を読み、counter = 6 に設定。
サーバーは 2 つの "6 に設定" 書き込みを見て、last-write-wins → counter = 6。
実際: 両タブがインクリメントした; 正しい値は 7 であるべき。

CRDT column はこれを、「現在の値」ではなく、サーバーが 2 つの並行貢献をロスレスにマージできる構造を格納することで修正します。crdtCounter{ clientA: 1, clientB: 1 } を格納し、sum = 2 として読み戻されます — サーバーは 2 つの貢献が到着した順序に関わらず、これに正しく収束します。

Grow-only カウンター。各クライアントは自身のスロットを所有し、それを単調に増加させます。観測される値はすべてのスロットの合計です。

const teamStats = table("teamStats", {
id: id(),
completions: crdtCounter(),
})

sumCrdtCounter で読み取り:

const stats = useLiveQuery(() => db.select().from(teamStats), [])
const total = sumCrdtCounter(stats[0]?.completions)

crdtIncrement で書き込み:

export const mutators = defineMutators<typeof schema, Ctx>()({
logCompletion: async ({ db, args, clientID }) => {
const rows = await db.select().from(teamStats).where(eq(teamStats.id, args.id))
const current = rows[0]?.completions as CrdtCounterMap | undefined
const next = crdtIncrement(clientID!, 1, current)
await db.update(teamStats).set({ completions: next }).where(eq(teamStats.id, args.id))
},
})

制約: crdtIncrement は正のデルタのみを受け取ります。デクリメントが必要なカウンターや両方向に振れるカウンターには crdtPnCounter が必要です。

符号付きカウンター。内部的には 2 つの G-Counter です — 1 つは正の貢献用(p)、1 つは負の貢献用(n)。観測される値 = sum(p) - sum(n)

const stats = table("stats", {
id: id(),
active: crdtPnCounter(),
})

pnRead で読み取り:

const total = pnRead(row.active)
// 3 (これは p=5, n=2 でも p=4, n=1 でもよい — セマンティクスは気にしない)

pnIncrement(正のデルタ)/ pnDecrement(正のデルタをデクリメントとして解釈)で書き込み:

completeTask: async ({ db, args, clientID }) => {
const rows = await db.select().from(stats).where(eq(stats.id, "team"))
const current = rows[0]?.active as PnCounterMap | undefined
const next = pnDecrement(clientID!, 1, current)
await db.update(stats).set({ active: next }).where(eq(stats.id, "team"))
}
reopenTask: async ({ db, args, clientID }) => {
const rows = await db.select().from(stats).where(eq(stats.id, "team"))
const current = rows[0]?.active as PnCounterMap | undefined
const next = pnIncrement(clientID!, 1, current)
await db.update(stats).set({ active: next }).where(eq(stats.id, "team"))
}

ゼロデルタは no-op です(アロケーションのチャーンなし)。

(ts, clientID) タイブレーク付きの last-writer-wins レジスター。「貢献がマージされるべき」ではなく「1 つのクライアントの値が勝つべき」フィールドに適しています — 例えばステータスラベル、担当者、ピン留めフラグ。

const tasks = table("tasks", {
id: id(),
status: crdtLwwRegister<"open" | "in-progress" | "done">(),
})

lwwRead で読み取り:

const status = lwwRead(row.status, "open") // 未設定時のフォールバック

lwwSet で書き込み — プリミティブを純粋でモックしやすく保つため、タイムスタンプは自分で供給します:

setStatus: async ({ db, args, clientID }) => {
const rows = await db.select().from(tasks).where(eq(tasks.id, args.id))
const current = rows[0]?.status as LwwRegister<string> | null | undefined
const next = lwwSet(clientID!, args.status, Date.now(), current)
await db.update(tasks).set({ status: next }).where(eq(tasks.id, args.id))
}

マージセマンティクス:

  • ts が大きい方が勝ちます。
  • ts が同点の場合、clientID が大きい方(辞書順)が勝ちます — 決定的なタイブレークなので、すべてのレプリカが同じ値に収束します。
  • null / 未設定のレジスターはもう一方の側にマージされます。

Observed-remove 集合。同じ要素の並行 add + remove は「add が勝つ」に解決されます。remove は観測された add タグをトゥームストーン化するだけだからです。

const messages = table("messages", {
id: id(),
reactions: crdtOrSet<string>(),
})

orSetValues で読み取り:

const emoji = orSetValues<string>(row.reactions)
// ["👍", "❤️"]

orSetAdd(clientID, seq, value, current) / orSetRemove(value, current) で書き込み:

addReaction: async ({ db, args, clientID, mutationID }) => {
const rows = await db.select().from(messages).where(eq(messages.id, args.messageId))
const current = rows[0]?.reactions as OrSet<string> | undefined
const next = orSetAdd(clientID!, mutationID!, args.emoji, current)
await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))
}
removeReaction: async ({ db, args }) => {
const rows = await db.select().from(messages).where(eq(messages.id, args.messageId))
const current = rows[0]?.reactions as OrSet<string> | undefined
const next = orSetRemove(args.emoji, current)
await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))
}

すべての add に付く (clientID, seq) タグは、rebase 中の再実行でも同じタグを生成することを保証します — マージは冪等です。

2 つのタブ、一方が 👍 を追加し、もう一方が 👍 を削除:

タブ A: 👍 を追加 → { adds: [tagA] }
タブ B: 👍 を追加 → { adds: [tagB] }
タブ A: 👍 を削除 → { adds: [tagA], tombstones: [tagA] }
↑ tagA をトゥームストーン化するが、tagB は未観測
マージ: { adds: [tagA, tagB], tombstones: [tagA] }
読み取り: ["👍"] (tagB が生き残る)

メッセージ reaction に最適です: 「ユーザー A が like した」と「ユーザー B が自身の reaction を unlike した」の競合は、A の like を失いません。

ユーザーごとの reaction — key を正しくモデル化する

Section titled “ユーザーごとの reaction — key を正しくモデル化する”

message の reaction で crdtOrSet<string> に絵文字をそのまま入れると、微妙な落とし穴があります:

// ❌ 脆い — orSetRemove("👍") は「👍」に一致する全 tag を削るので
// 「A が unlike」で B の like も消える (merge で復元されるとはいえ、
// 中間の optimistic 状態は誤り)。
orSetRemove("👍", row.reactions)

要素を user 付きの複合キーに モデル化するのが正解:

// ✅ ユーザー単位の reaction。A の unlike は A 自身の entry のみ削除。
orSetAdd(clientID, mutationID, `${userId}:👍`, row.reactions)
orSetRemove(`${userId}:👍`, row.reactions)
// 読み取り: userId prefix を落として絵文字だけ、または保持して
// 「誰が何をリアクションしたか」を得る。
const emoji = orSetValues<string>(row.reactions)
.map((s) => s.split(":")[1])

Add-wins は tag レベルで働くので、どの interleaving でも収束します。

4 種類の column type は counter / register / set 状態をカバーします — per-user アプリで CRDT 形状の並行状態のほとんどはこれで足ります。

扱わない ものは:

  • テキスト / リッチテキスト (Yjs / Automerge / Loro 形状)。共同編集される文書本体は OR-Set や LWW では表せず、RGA / YATA / Fugue 型の sequence CRDT が必要。plasma v1.0 は同梱していません。回避策は、serialize した Yjs ドキュメントを file() / json() に opaque payload として持ち、編集のたび blob 丸ごとを sync する方法 — 収束はしますがコスト高く live-query との組み合わせも効きません。
  • 順序ある sequence / list (プレイリスト / ドラッグで並べ替え可能なタスク)。OR-Set はメンバーシップだけで順序を持ちません。
  • ツリー (nested outline / move 意味を持つフォルダ構造)。同じく in-CRDT 順序 primitive がないため対象外。

sequence / text CRDT は v1.1 候補としてキュー中 — Roadmap を参照。

サーバーエンジンは push 時に CRDT column を自動的にマージします。CRDT column に resolveConflict を書く必要はありません。sql-engine.tsmergeCrdtColumns 関数が、column の種類に基づいて mergeCrdtCounter / mergePnCounter / mergeLwwRegister / mergeOrSet にディスパッチします。

注意点: これは書き込み前に現在のサーバー行がフェッチされたときにのみ実行されます。sql-engine.ts は、書き込みが table のいずれかの CRDT column を触っているときにそれをフェッチします — executeUpdatetouchesCrdt が 4 種類すべてをカバーします。

知っておくべきセマンティクス

Section titled “知っておくべきセマンティクス”
  • 読み取り/書き込みの型の非対称性。 column の型は観測される値(例: カウンターでは Column<number>)ですが、書き込みはストレージ形状(CrdtCounterMapPnCounterMap、…)を渡さなければなりません。呼び出し側は .set({ counter: next as unknown as number }) と書きます。この非対称性を内部化する v1.1 の db.increment(col, delta) 高レベル API に予定されています。
  • column の種類の変更にはサーバー側の再作成が必要です。 column を crdtCounter から crdtPnCounter に切り替える(またはあらゆる種類の変更)ことは runMigrations に拒否されます。DROP TABLE して再 migrate するか、新しい名前の新しい table を出荷する必要があります。