Skip to content

CRDT Columns

CRDT (Conflict-free Replicated Data Type) columns let two clients write to the same row concurrently and merge losslessly. plasma ships four: a grow-only counter, a signed counter, a last-writer-wins register, and an observed-remove set.

Consider two tabs incrementing a counter:

Tab A reads counter = 5, sets counter = 6.
Tab B reads counter = 5, sets counter = 6.
Server sees two "set to 6" writes and last-write-wins → counter = 6.
Actual: both tabs incremented; correct value should be 7.

CRDT columns fix this by storing not “the current value” but a structure that lets the server merge two concurrent contributions losslessly. crdtCounter stores { clientA: 1, clientB: 1 } and reads back as sum = 2 — which the server converges to correctly regardless of the order the two contributions arrived.

Grow-only counter. Every client owns its own slot and monotonically increases it. The observed value is the sum of every slot.

const teamStats = table("teamStats", {
id: id(),
completions: crdtCounter(),
})

Read via sumCrdtCounter:

const stats = useLiveQuery(() => db.select().from(teamStats), [])
const total = sumCrdtCounter(stats[0]?.completions)

Write via crdtIncrement:

export const mutators = defineMutators<typeof schema, Ctx>()({
logCompletion: async ({ db, args, clientID }) => {
const rows = await db.select().from(teamStats).where(eq(teamStats.id, args.id))
const current = rows[0]?.completions as CrdtCounterMap | undefined
const next = crdtIncrement(clientID!, 1, current)
await db.update(teamStats).set({ completions: next }).where(eq(teamStats.id, args.id))
},
})

Constraint: crdtIncrement only accepts positive deltas. A counter you need to decrement or that swings both ways needs crdtPnCounter.

Signed counter. Under the hood it’s two G-Counters — one for positive contributions (p), one for negative (n). Observed value = sum(p) - sum(n).

const stats = table("stats", {
id: id(),
active: crdtPnCounter(),
})

Read via pnRead:

const total = pnRead(row.active)
// 3 (which could be p=5, n=2 or p=4, n=1 — semantics don't care)

Write via pnIncrement (positive delta) / pnDecrement (positive delta interpreted as decrement):

completeTask: async ({ db, args, clientID }) => {
const rows = await db.select().from(stats).where(eq(stats.id, "team"))
const current = rows[0]?.active as PnCounterMap | undefined
const next = pnDecrement(clientID!, 1, current)
await db.update(stats).set({ active: next }).where(eq(stats.id, "team"))
}
reopenTask: async ({ db, args, clientID }) => {
const rows = await db.select().from(stats).where(eq(stats.id, "team"))
const current = rows[0]?.active as PnCounterMap | undefined
const next = pnIncrement(clientID!, 1, current)
await db.update(stats).set({ active: next }).where(eq(stats.id, "team"))
}

Zero deltas are a no-op (no allocation churn).

Last-writer-wins register with (ts, clientID) tiebreak. Suitable for fields where “one client’s value should win” rather than “the contributions should merge” — e.g. a status label, an assignee, a pinned flag.

const tasks = table("tasks", {
id: id(),
status: crdtLwwRegister<"open" | "in-progress" | "done">(),
})

Read via lwwRead:

const status = lwwRead(row.status, "open") // fallback if unset

Write via lwwSet — you supply the timestamp so the primitive stays pure and easily mockable:

setStatus: async ({ db, args, clientID }) => {
const rows = await db.select().from(tasks).where(eq(tasks.id, args.id))
const current = rows[0]?.status as LwwRegister<string> | null | undefined
const next = lwwSet(clientID!, args.status, Date.now(), current)
await db.update(tasks).set({ status: next }).where(eq(tasks.id, args.id))
}

Merge semantics:

  • Higher ts wins.
  • On ts tie, larger clientID (lexicographic) wins — deterministic tiebreak so every replica converges to the same value.
  • null / unset register merges to the other side.

Observed-remove set. Concurrent add + remove of the same element resolves to “add wins” because the remove only tombstones the observed add tags.

const messages = table("messages", {
id: id(),
reactions: crdtOrSet<string>(),
})

Read via orSetValues (list) or orSetHas (membership):

const emoji = orSetValues<string>(row.reactions)
// ["👍", "❤️"]
// Membership check without iterating:
const hasHeart = orSetHas(row.reactions, "❤️")

Write via orSetAdd(clientID, seq, value, current) / orSetRemove(value, current):

addReaction: async ({ db, args, clientID, mutationID }) => {
const rows = await db.select().from(messages).where(eq(messages.id, args.messageId))
const current = rows[0]?.reactions as OrSet<string> | undefined
const next = orSetAdd(clientID!, mutationID!, args.emoji, current)
await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))
}
removeReaction: async ({ db, args }) => {
const rows = await db.select().from(messages).where(eq(messages.id, args.messageId))
const current = rows[0]?.reactions as OrSet<string> | undefined
const next = orSetRemove(args.emoji, current)
await db.update(messages).set({ reactions: next }).where(eq(messages.id, args.messageId))
}

The (clientID, seq) tag on every add ensures re-runs during rebase produce the same tag — the merge is idempotent.

Two tabs, one adds 👍, the other removes 👍:

Tab A: adds 👍 → { adds: [tagA] }
Tab B: adds 👍 → { adds: [tagB] }
Tab A: removes 👍 → { adds: [tagA], tombstones: [tagA] }
↑ tombstone tagA, but tagB is unobserved
Merge: { adds: [tagA, tagB], tombstones: [tagA] }
Read: ["👍"] (tagB survives)

Perfect for message reactions: a race between “user A liked” and “user B unliked their own reaction” doesn’t lose A’s like.

Per-user reactions — model the key correctly

Section titled “Per-user reactions — model the key correctly”

crdtOrSet<string> on message reactions has a subtle failure mode if you use the emoji as the set element:

// ❌ Fragile — orSetRemove("👍") strips EVERY tag matching "👍",
// so "A unlikes" wipes B's like too (until the merge restores it —
// intermediate optimistic state is wrong).
orSetRemove("👍", row.reactions)

The fix is to model the element as a compound key with the user attached:

// ✅ Per-user reaction. A unliking only removes A's own entry.
orSetAdd(clientID, mutationID, `${userId}:👍`, row.reactions)
orSetRemove(`${userId}:👍`, row.reactions)
// Read: strip the userId prefix to get emoji only, or keep it if
// you want "who reacted with what".
const emoji = orSetValues<string>(row.reactions)
.map((s) => s.split(":")[1])

Add-wins still applies at the tag level, so this stays convergent under any interleaving.

The four column types cover counter / register / set state — which is most of what CRDT-shaped concurrent state looks like in per-user apps.

They don’t cover:

  • Text / rich-text (Yjs, Automerge, Loro shapes). A collaboratively-edited document body isn’t expressible as OR-Set or LWW — you need an RGA / YATA / Fugue-style sequence CRDT. plasma doesn’t ship one in v1.0. The workaround is to store the serialised Yjs document as an opaque payload in a file() or json() column and sync the whole blob per edit; that gets you eventual consistency but isn’t cheap and doesn’t compose with live-query subscriptions.
  • Sequences / ordered lists where order is significant (playlist, drag-reorderable tasks). OR-Set gives you membership but no order.
  • Trees (nested outline, folder structure with move semantics). Same reason — no in-CRDT ordering primitive.

Sequence and text CRDTs are queued as a v1.1 candidate; track the Roadmap.

The server engine merges CRDT columns automatically on push. You don’t write a resolveConflict for CRDT columns; the mergeCrdtColumns function in sql-engine.ts dispatches to mergeCrdtCounter / mergePnCounter / mergeLwwRegister / mergeOrSet based on the column kind.

The catch: this only runs when the current server row is fetched before the write. sql-engine.ts fetches it when the write is touching any CRDT column of the table — touchesCrdt in executeUpdate covers all four kinds.

  • Read/write type asymmetry. Column type is the observed value (e.g. Column<number> for counters), but writes must pass the storage shape (CrdtCounterMap, PnCounterMap, …). Callers write .set({ counter: next as unknown as number }). Queued for v1.1’s db.increment(col, delta) high-level API which internalises this asymmetry.
  • Changing a column’s kind requires a server-side recreate. Switching a column from crdtCounter to crdtPnCounter (or any kind change) is refused by runMigrations. You need to DROP TABLE and re-migrate, or ship a new table with a new name.