サーバーサイド操作
plasma のメインストーリーはブラウザ + Worker の sync ループですが、 サーバー側にはクライアントに対応物がないいくつかの API があります: 管理ダッシュボード、cron 圧縮、履歴スナップショット、DO poke ヘルパー などです。この guide ではそれら全てを扱います。
serverLiveSelect — サーバーコードからのリアクティブクエリ
Section titled “serverLiveSelect — サーバーコードからのリアクティブクエリ”「全組織の todo 数を live に見せる管理ダッシュボード」が欲しい?
ブラウザ側の useLiveQuery は使えません — あれは IDB に対して走ります。
serverLiveSelect の出番です:
import { fromD1, serverLiveSelect } from "@sh1n4ps/plasma-server"
const executor = fromD1(env.DB)
const handle = serverLiveSelect({ executor, tables: ["todos", "users"], // これらのテーブルが変わったときだけ発火 pollIntervalMs: 1000, // ハートビート間隔
fetch: async () => { // 好きな SQL を書ける — auth.read はバイパスされるので // 管理 / 横断可視性用途に限定。 const rows = await executor.all({ text: "SELECT userId, COUNT(*) AS n FROM todos GROUP BY userId", params: [], }) return rows },
onChange: (rows) => { broadcastToAdmins(rows) // WebSocket / SSE 等へ push },})
// あとでhandle.stop()性質:
- Change log を poll する。
tablesのいずれかの行に新しいrow_versionが入るとfetchを再実行しonChangeを emit。 tablesフィルタは optional。 省略すると全書き込み可能テーブルに 反応。pollIntervalMsはハートビートも兼ねる。 静かな時間帯でもfetchが走るので、ランナーが生きていることを caller が確認できる。 デフォルト 1000ms。- Auth バイパス。 管理 / server-only 用途です。エンドユーザーに
結果を素直に返すなら
fetch内で行 auth 述語を自前で適用してください。 - subscriber ごとに 1 つの polling loop。 スケール優先なら
SyncCoordinator+ subscriber ごとのPlasmaClientに。serverLiveSelectはスケールとシンプルさのトレードオフです。
snapshotAsOf — 過去の状態をリプレイ
Section titled “snapshotAsOf — 過去の状態をリプレイ”任意のテーブルの過去のある時点での見え方を再構築:
import { snapshotAsOf } from "@sh1n4ps/plasma-server"
const past = await snapshotAsOf({ schema, executor: fromD1(env.DB), cookie: "12345", // row_version カーソル (文字列 or 数値)})
// past は Map<tableName, Map<rowId, row>>const todosThen = past.get("todos")for (const [id, row] of todosThen ?? []) { console.log(id, row)}しくみ: row_version <= cookie の全 _plasma_changes エントリを
リプレイ。cookie 前に削除された行は結果に含まれず、cookie 後に
更新された行は以前の値を返します。
コスト: change log エントリ数に比例 (O(N))。1000 万件書き込みが
あるテーブルでは体感できます — 全中間状態が必要でないなら
compactChangeLog を検討。
用途:
- 監査ダッシュボード (「30 日前の状態を表示」)
- コンプライアンス replay (「2026-01-15 の記録は?」)
- 「1 週間前と比較」UI
- デバッグ (「この行が X から Y になったのはいつ?」)
compactChangeLog — change log を刈り込む
Section titled “compactChangeLog — change log を刈り込む”書き込みごとに _plasma_changes に行が追加されます。時間が経つと log が
無制限に肥大化し pull が遅くなります。compactChangeLog は
(table_name, row_id) 単位で中間状態を最新値まで折りたたみます:
import { compactChangeLog } from "@sh1n4ps/plasma-server"
const result = await compactChangeLog({ executor: fromD1(env.DB), safeUpToVersion: 5000000, // cookie が この未満のクライアントは畳まれた state を見る})
console.log(`Removed ${result.removed} intermediate versions, kept ${result.kept} rows`)正しさ契約:
- Pull cookie が
safeUpToVersion以上のクライアントは影響なし — そもそもそれらの行を skip している。 - Cookie が
safeUpToVersion未満のクライアントは、行が畳まれた state にジャンプ。中間遷移は miss しますが truth には収束します。 snapshotAsOf(cookie < safeUpToVersion)の精度は落ちる — 中間履歴が消えるため。
safeUpToVersion の選び方:
- 理想:
min(全アクティブクライアントの cookie)。列挙できるなら最小値。 - 実務: 十分な safety margin を残す。「60 日前の最大 row_version」は 安全なヒューリスティック — 実運用では 60 日 stale なクライアントは ほぼいない。
Scheduled Worker から呼ぶ (週次が一般的):
export default { async scheduled(_event, env, ctx) { const executor = fromD1(env.DB) ctx.waitUntil(compactChangeLog({ executor, safeUpToVersion: await computeSafeCursor(executor), })) },}gcOrphanedBlobs — R2 ストレージの解放
Section titled “gcOrphanedBlobs — R2 ストレージの解放”file() column を持つ行を削除すると reference count が下がります。
count が 0 になった blob は GC 対象になりますが、再アタッチフローが
キャッシュに当たれるよう grace window が設けられます:
import { gcOrphanedBlobs, r2Storage } from "@sh1n4ps/plasma-server"
const result = await gcOrphanedBlobs({ executor: fromD1(env.DB), storage: r2Storage({ bucket: env.BUCKET }), minOrphanAgeMs: 7 * 24 * 3600 * 1000, // 7 日 grace limit: 500, // 1 回あたりの上限})
console.log(`Deleted ${result.deleted} blobs, ${result.remaining} more waiting`)Blob のライフサイクル詳細は Files and Blobs を 参照。
reconcileBlobRefs — raw DDL 後の再構築
Section titled “reconcileBlobRefs — raw DDL 後の再構築”管理スクリプトが createSyncHandler を経由せず user table に直接書く
と、_plasma_blob_refs のカウンタが drift します。修復:
import { reconcileBlobRefs } from "@sh1n4ps/plasma-server"
await reconcileBlobRefs({ schema, executor: fromD1(env.DB), dialect: sqliteDialect,})実務ではあまり使いませんが、移行後の sanity check やバルクインポート 後に取っておくと安心。
pokeCoordinator — サーバーコードから WebSocket poke を発火
Section titled “pokeCoordinator — サーバーコードから WebSocket poke を発火”SyncCoordinator DO は接続中の全 WebSocket に対して room 単位で
poke メッセージをファンアウトします。pokeCoordinator は DO stub
を自前で呼ばずにサーバーサイドから poke をトリガするヘルパーです:
import { pokeCoordinator } from "@sh1n4ps/plasma-server/coordinator"
// sync handler / cron / 外部 webhook 等からawait pokeCoordinator(env.COORDINATOR, { room: `group-${clientGroupID}`, token: env.POKE_TOKEN,})オプション:
room— poke する room。デフォルト"global"。クライアント側の scheme に合わせる (デフォルトはclientGroupID、doc 単位に override していれば その key)。token— 必須。Worker ネットワーク上の任意プロセスが poke を 引けないようにガード。共有秘密を設定してカスタム coordinator サブ クラスで検証する運用も可能。
利用パターン:
createSyncHandlerから: push 成功後に plasma が自動で poke。 sync ループのために手動で呼ぶ必要はありません。- Scheduled Worker から: バッチインポートや cron 更新後、影響 範囲の room を poke してデフォルト 5s poll を待たずに反映。
- 外部 webhook から: 外部サービスからの変更、poll timer を 待たず plasma クライアントに知らせたいとき。
createServerDb — sync handler 外での query builder
Section titled “createServerDb — sync handler 外での query builder”createServerDb は mutator 内と同じ Db<Schema> builder を、mutator
の外 (cron / 管理スクリプト / バックグラウンド aggregator) から使う
ための API です:
import { createServerDb, fromD1 } from "@sh1n4ps/plasma-server"
const db = createServerDb({ schema, executor: fromD1(env.DB), ctx: { userId: "admin", role: "admin" }, // auth が適用される (後述)})
const todos = await db.select().from(todos).where(...)Auth の挙動:
TableOptions.auth: { read, write }を宣言していれば、createServerDbは渡したctxに対して述語を評価します。- 管理用途で auth を bypass したいなら、寛容な述語を通せる ctx
(
{ userId: "*", role: "admin" }+auth.readをrole === "admin"で short-circuit) を渡します。
invokeMutator — サーバーサイドで mutator を実行
Section titled “invokeMutator — サーバーサイドで mutator を実行”名前指定でサーバーから mutator を呼ぶ (cron / webhook / 管理ツール):
import { invokeMutator } from "@sh1n4ps/plasma-core"
await invokeMutator(mutators, "markDone", { db: createServerDb({ schema, executor, ctx: { userId: "system" } }), args: { id: "t1", updatedAt: Date.now() }, ctx: { userId: "system" }, clientID: "system", mutationID: 0,})書き込みは _plasma_changes に記録され、クライアントの次回 pull で
配送されます — wire 経由の mutation と同じ扱いです。
用途:
- スケジュール更新 (stale todo を done にする、trial を expire する等)。
- 外部サービスからの webhook。
- 監査履歴に載せたい管理操作。
タスク別の API 選択
Section titled “タスク別の API 選択”| タスク | 使う API |
|---|---|
| 管理ダッシュボードの live 表示 | serverLiveSelect |
| 履歴 / 監査 replay | snapshotAsOf |
| Change log の肥大化 | compactChangeLog |
| R2 使用量の増加 | gcOrphanedBlobs |
| DDL 後 blob refs drift | reconcileBlobRefs |
| WebSocket poke を手動で送る | pokeCoordinator |
| Cron から query builder を使う | createServerDb |
| Cron から mutator を実行する | invokeMutator |
次に読むページ
Section titled “次に読むページ”- Deployment — cron 系 API の
scheduledハンドラ配線 - Files and Blobs — Blob GC のライフサイクル
- Migrations — schema 変更の
runMigrations - Presence — coordinator 近傍のもう 1 つの API