コンテンツにスキップ

サーバーサイド操作

plasma のメインストーリーはブラウザ + Worker の sync ループですが、 サーバー側にはクライアントに対応物がないいくつかの API があります: 管理ダッシュボード、cron 圧縮、履歴スナップショット、DO poke ヘルパー などです。この guide ではそれら全てを扱います。

serverLiveSelect — サーバーコードからのリアクティブクエリ

Section titled “serverLiveSelect — サーバーコードからのリアクティブクエリ”

「全組織の todo 数を live に見せる管理ダッシュボード」が欲しい? ブラウザ側の useLiveQuery は使えません — あれは IDB に対して走ります。 serverLiveSelect の出番です:

import { fromD1, serverLiveSelect } from "@sh1n4ps/plasma-server"
const executor = fromD1(env.DB)
const handle = serverLiveSelect({
executor,
tables: ["todos", "users"], // これらのテーブルが変わったときだけ発火
pollIntervalMs: 1000, // ハートビート間隔
fetch: async () => {
// 好きな SQL を書ける — auth.read はバイパスされるので
// 管理 / 横断可視性用途に限定。
const rows = await executor.all({
text: "SELECT userId, COUNT(*) AS n FROM todos GROUP BY userId",
params: [],
})
return rows
},
onChange: (rows) => {
broadcastToAdmins(rows) // WebSocket / SSE 等へ push
},
})
// あとで
handle.stop()

性質:

  • Change log を poll する。 tables のいずれかの行に新しい row_version が入ると fetch を再実行し onChange を emit。
  • tables フィルタは optional。 省略すると全書き込み可能テーブルに 反応。
  • pollIntervalMs はハートビートも兼ねる。 静かな時間帯でも fetch が走るので、ランナーが生きていることを caller が確認できる。 デフォルト 1000ms。
  • Auth バイパス。 管理 / server-only 用途です。エンドユーザーに 結果を素直に返すなら fetch 内で行 auth 述語を自前で適用してください。
  • subscriber ごとに 1 つの polling loop。 スケール優先なら SyncCoordinator + subscriber ごとの PlasmaClient に。 serverLiveSelect はスケールとシンプルさのトレードオフです。

snapshotAsOf — 過去の状態をリプレイ

Section titled “snapshotAsOf — 過去の状態をリプレイ”

任意のテーブルの過去のある時点での見え方を再構築:

import { snapshotAsOf } from "@sh1n4ps/plasma-server"
const past = await snapshotAsOf({
schema,
executor: fromD1(env.DB),
cookie: "12345", // row_version カーソル (文字列 or 数値)
})
// past は Map<tableName, Map<rowId, row>>
const todosThen = past.get("todos")
for (const [id, row] of todosThen ?? []) {
console.log(id, row)
}

しくみ: row_version <= cookie の全 _plasma_changes エントリを リプレイ。cookie 前に削除された行は結果に含まれず、cookie 後に 更新された行は以前の値を返します。

コスト: change log エントリ数に比例 (O(N))。1000 万件書き込みが あるテーブルでは体感できます — 全中間状態が必要でないなら compactChangeLog を検討。

用途:

  • 監査ダッシュボード (「30 日前の状態を表示」)
  • コンプライアンス replay (「2026-01-15 の記録は?」)
  • 「1 週間前と比較」UI
  • デバッグ (「この行が X から Y になったのはいつ?」)

compactChangeLog — change log を刈り込む

Section titled “compactChangeLog — change log を刈り込む”

書き込みごとに _plasma_changes に行が追加されます。時間が経つと log が 無制限に肥大化し pull が遅くなります。compactChangeLog(table_name, row_id) 単位で中間状態を最新値まで折りたたみます:

import { compactChangeLog } from "@sh1n4ps/plasma-server"
const result = await compactChangeLog({
executor: fromD1(env.DB),
safeUpToVersion: 5000000, // cookie が この未満のクライアントは畳まれた state を見る
})
console.log(`Removed ${result.removed} intermediate versions, kept ${result.kept} rows`)

正しさ契約:

  • Pull cookie が safeUpToVersion 以上のクライアントは影響なし — そもそもそれらの行を skip している。
  • Cookie が safeUpToVersion 未満のクライアントは、行が畳まれた state にジャンプ。中間遷移は miss しますが truth には収束します。
  • snapshotAsOf(cookie < safeUpToVersion) の精度は落ちる — 中間履歴が消えるため。

safeUpToVersion の選び方:

  • 理想: min(全アクティブクライアントの cookie)。列挙できるなら最小値。
  • 実務: 十分な safety margin を残す。「60 日前の最大 row_version」は 安全なヒューリスティック — 実運用では 60 日 stale なクライアントは ほぼいない。

Scheduled Worker から呼ぶ (週次が一般的):

export default {
async scheduled(_event, env, ctx) {
const executor = fromD1(env.DB)
ctx.waitUntil(compactChangeLog({
executor,
safeUpToVersion: await computeSafeCursor(executor),
}))
},
}

gcOrphanedBlobs — R2 ストレージの解放

Section titled “gcOrphanedBlobs — R2 ストレージの解放”

file() column を持つ行を削除すると reference count が下がります。 count が 0 になった blob は GC 対象になりますが、再アタッチフローが キャッシュに当たれるよう grace window が設けられます:

import { gcOrphanedBlobs, r2Storage } from "@sh1n4ps/plasma-server"
const result = await gcOrphanedBlobs({
executor: fromD1(env.DB),
storage: r2Storage({ bucket: env.BUCKET }),
minOrphanAgeMs: 7 * 24 * 3600 * 1000, // 7 日 grace
limit: 500, // 1 回あたりの上限
})
console.log(`Deleted ${result.deleted} blobs, ${result.remaining} more waiting`)

Blob のライフサイクル詳細は Files and Blobs を 参照。

reconcileBlobRefs — raw DDL 後の再構築

Section titled “reconcileBlobRefs — raw DDL 後の再構築”

管理スクリプトが createSyncHandler を経由せず user table に直接書く と、_plasma_blob_refs のカウンタが drift します。修復:

import { reconcileBlobRefs } from "@sh1n4ps/plasma-server"
await reconcileBlobRefs({
schema,
executor: fromD1(env.DB),
dialect: sqliteDialect,
})

実務ではあまり使いませんが、移行後の sanity check やバルクインポート 後に取っておくと安心。

pokeCoordinator — サーバーコードから WebSocket poke を発火

Section titled “pokeCoordinator — サーバーコードから WebSocket poke を発火”

SyncCoordinator DO は接続中の全 WebSocket に対して room 単位で poke メッセージをファンアウトします。pokeCoordinator は DO stub を自前で呼ばずにサーバーサイドから poke をトリガするヘルパーです:

import { pokeCoordinator } from "@sh1n4ps/plasma-server/coordinator"
// sync handler / cron / 外部 webhook 等から
await pokeCoordinator(env.COORDINATOR, {
room: `group-${clientGroupID}`,
token: env.POKE_TOKEN,
})

オプション:

  • room — poke する room。デフォルト "global"。クライアント側の scheme に合わせる (デフォルトは clientGroupID、doc 単位に override していれば その key)。
  • token — 必須。Worker ネットワーク上の任意プロセスが poke を 引けないようにガード。共有秘密を設定してカスタム coordinator サブ クラスで検証する運用も可能。

利用パターン:

  • createSyncHandler から: push 成功後に plasma が自動で poke。 sync ループのために手動で呼ぶ必要はありません。
  • Scheduled Worker から: バッチインポートや cron 更新後、影響 範囲の room を poke してデフォルト 5s poll を待たずに反映。
  • 外部 webhook から: 外部サービスからの変更、poll timer を 待たず plasma クライアントに知らせたいとき。

createServerDb — sync handler 外での query builder

Section titled “createServerDb — sync handler 外での query builder”

createServerDb は mutator 内と同じ Db<Schema> builder を、mutator の外 (cron / 管理スクリプト / バックグラウンド aggregator) から使う ための API です:

import { createServerDb, fromD1 } from "@sh1n4ps/plasma-server"
const db = createServerDb({
schema,
executor: fromD1(env.DB),
ctx: { userId: "admin", role: "admin" }, // auth が適用される (後述)
})
const todos = await db.select().from(todos).where(...)

Auth の挙動:

  • TableOptions.auth: { read, write } を宣言していれば、 createServerDb は渡した ctx に対して述語を評価します。
  • 管理用途で auth を bypass したいなら、寛容な述語を通せる ctx ({ userId: "*", role: "admin" } + auth.readrole === "admin" で short-circuit) を渡します。

invokeMutator — サーバーサイドで mutator を実行

Section titled “invokeMutator — サーバーサイドで mutator を実行”

名前指定でサーバーから mutator を呼ぶ (cron / webhook / 管理ツール):

import { invokeMutator } from "@sh1n4ps/plasma-core"
await invokeMutator(mutators, "markDone", {
db: createServerDb({ schema, executor, ctx: { userId: "system" } }),
args: { id: "t1", updatedAt: Date.now() },
ctx: { userId: "system" },
clientID: "system",
mutationID: 0,
})

書き込みは _plasma_changes に記録され、クライアントの次回 pull で 配送されます — wire 経由の mutation と同じ扱いです。

用途:

  • スケジュール更新 (stale todo を done にする、trial を expire する等)。
  • 外部サービスからの webhook。
  • 監査履歴に載せたい管理操作。
タスク 使う API
管理ダッシュボードの live 表示 serverLiveSelect
履歴 / 監査 replay snapshotAsOf
Change log の肥大化 compactChangeLog
R2 使用量の増加 gcOrphanedBlobs
DDL 後 blob refs drift reconcileBlobRefs
WebSocket poke を手動で送る pokeCoordinator
Cron から query builder を使う createServerDb
Cron から mutator を実行する invokeMutator
  • Deployment — cron 系 API の scheduled ハンドラ配線
  • Files and Blobs — Blob GC のライフサイクル
  • Migrations — schema 変更の runMigrations
  • Presence — coordinator 近傍のもう 1 つの API