コンテンツにスキップ

Migrations

plasma には plasma generate CLI も、schema diff ファイルも、バージョンごとの migration スクリプトもありません。schema の進化は、2 つの関数と 1 つの文字列によってランタイムで行われます:

  • ensureSchema({ schema, executor }) — table が存在しなければ作成します。冪等です。
  • runMigrations({ schema, executor }) — ライブ DB を宣言された schema と照合します。新しい column を追加し、トリガーを再生成し、削除を拒否します。
  • SCHEMA_VERSION — クライアントとサーバーの両方がエクスポートする文字列です。不一致は明確に定義されたハンドシェイクを引き起こします。

Worker の fetch ハンドラでこれを呼びます:

export default {
async fetch(req: Request, env: Env) {
const executor = fromD1(env.DB)
await ensureSchema({ schema, executor })
return createSyncHandler({ ... })(req)
},
}

ensureSchema は、すべてのユーザー table とすべての plasma 内部 table(_plasma_changes_plasma_client_mutations_plasma_origin_context_plasma_blobs_plasma_blob_refs_plasma_region_versions)に対して CREATE TABLE IF NOT EXISTS を実行します。また、change log を埋める AFTER INSERT/UPDATE/DELETE トリガーもインストールします。

すべてのリクエストで呼んでも安全です。トリガーの DDL は CREATE TRIGGER IF NOT EXISTS なので、再実行は安価です。

進化のためには runMigrations を呼びます(通常はすべてのリクエストではなく、1 回限りの管理ルートやデプロイフックから):

admin.ts
import { runMigrations } from "@sh1n4ps/plasma-server"
export async function POST(req: Request, env: Env) {
// まず auth チェック — これはやや破壊的な呼び出し。
if (req.headers.get("x-admin-token") !== env.ADMIN_TOKEN) {
return new Response("forbidden", { status: 403 })
}
const executor = fromD1(env.DB)
const result = await runMigrations({ schema, executor })
return Response.json(result)
}

runMigrations は宣言された schema とイントロスペクトした DB の間の SchemaDiff を計算し、次を行います:

  1. 宣言された側には存在するが DB には存在しない column を追加します
  2. ペイロードの json_object(...) が現在の column セットを反映するよう すべてのトリガーを再生成します(drop + create)。この実行で追加された column は、次の書き込みが change log を埋める前にトリガーの更新が必要です。
  3. 削除を拒否します。 schema が table や column を削除した場合、runMigrations はデータを静かに失う代わりに、問題の diff とともに MigrationRefused を throw します。
変更 許可 備考
新しい table の追加 完全な CREATE が実行され、トリガーがインストールされる。
新しい column の追加 ALTER TABLE ADD COLUMN。nullable な column とデフォルトを持つ column が動作する。
既存 column への .default() の追加 表面的な変更; schema はそれを認識するが、既存の行は格納された値を保持する。
既存 column への .nullable() の追加 ⚠️ DB 制約が NOT NULL から NULL に変わる。一部のドライバは table の再構築を必要とする — plasma はこれを自動化しない。手動 DDL が必要。
.nullable() な column を NOT NULL にする runMigrations は拒否する — DB に新しい制約に違反する NULL を持つ行があるかもしれない。
既存 column への .unique() の追加 ⚠️ 一意性は今後強制されるが、既存の重複行は検出されない。
新しい mutator の追加 mutator は DB の概念ではない — コード内にのみ存在する。
新しい file() column の追加 トリガーが更新され、_plasma_blob_refs が自己維持される。
table の削除 拒否される。
column の削除 拒否される。
column のリネーム 拒否される(削除と追加の両方)。
column の種類の変更 拒否される(種類の変更 = 削除 + 追加)。

plasma が拒否する変更については、3 つの選択肢があります:

  1. その変更を 新しい table / column として出荷し、アプリコードでデータを移行する。
  2. SCHEMA_VERSION をインクリメントし、クライアントで onSchemaMismatch を使ってユーザーにローカル状態のリセットを促す(optimistic な保留中の mutation を失う代わりに、新しい schema を得る)。
  3. wrangler d1 execute から手動で DDL を実行する — 正しさは自己責任。

SCHEMA_VERSION — クライアント/サーバーのハンドシェイク

Section titled “SCHEMA_VERSION — クライアント/サーバーのハンドシェイク”

すべての push と pull はリクエストに schemaVersion を含めます。サーバーの SyncHandlerOptions.schemaVersion が一致しない場合、ハンドラは 409 Conflict を返します:

HTTP/1.1 409 Conflict
{
"error": "schema mismatch",
"expected": "todos-v2"
}

クライアントでは、plasma は SyncClientError({ kind: "schema-mismatch", phase, expected }) を発生させ、指定していれば options.onSchemaMismatch({ phase }) を呼びます。ローカル状態を消去して新規に始めるには "reset" を、ローカル IDB をそのままにして onError を通じて不一致を表面化するだけなら "stay" を返します。

createPlasmaClient({
schema,
mutators,
endpoint: "/sync",
schemaVersion: "todos-v2",
onSchemaMismatch: async ({ phase }) => {
if (confirm(`Server updated; clear local data and reload?`)) {
return "reset"
}
return "stay"
},
})

SCHEMA_VERSION をインクリメントするとき

Section titled “SCHEMA_VERSION をインクリメントするとき”
  • 古いクライアントが pull で静かに見逃す新しい column を追加した(null を読むが、schema は non-null と言う)。
  • 既存の mutator に必須の arg を追加した(古いクライアントは args 検証に失敗するペイロードを push する)。
  • 既存の mutator のセマンティクスを後方互換性のない形で変更した。

次のためには インクリメントしないでください:

  • 新しい mutator の追加(古いクライアントは単に呼べない)。
  • 新しい table の追加(古いクライアントは無視する)。
  • 表面的な変更(.default() のリネーム、どこかのクエリへの集約の追加)。

ensureSchema / runMigrations の呼び出しは、wrangler d1 execute --command ".schema" や psql の \d で見える DDL を発行します。デバッグ時や移行時に役立ちます。

テーブル 用途 作成主
_plasma_changes 全書き込みの change log 行 (row_version / table / id / op / value / created_at / origin) — Change Log と Cookie ensureSchema
_plasma_client_mutations (clientGroupID, clientID) 単位の last_mutation_id 監視値 — 重複排除台帳 ensureSchema
_plasma_origin AFTER-write トリガーに (clientGroupID, clientID, mutationID) を渡す 1 行スクラッチ ensureSchema
_plasma_blobs ハッシュ単位の blob 状態 (absent / uploading / present / orphaned) — Files and Blobs ensureSchema
_plasma_blob_refs Blob 読取り auth 用の逆引きインデックス (hash, table_name, row_id, column_name) ensureSchema
_plasma_region_versions SequencerDO が使うリージョン別の monotonic version 状態 — Multi-region ensureRegionVersionsTable

これらは SELECT で state 確認できますが、直接変更すると sync セマンティクスが壊れます — 常に mutator / sync handler 経由で書き込んでください。

各 user table に、SQLite なら 3 つの AFTER トリガー、Postgres なら 3 つの AFTER トリガー関数を install します:

-- todos テーブルに対する SQLite トリガーの例
CREATE TRIGGER IF NOT EXISTS _plasma_trg_todos_insert
AFTER INSERT ON todos
BEGIN
INSERT INTO _plasma_changes
(table_name, row_id, op, value, created_at, client_group_id, client_id, mutation_id)
VALUES
('todos', NEW.id, 'put', json_object(...), (unixepoch() * 1000),
(SELECT client_group_id FROM _plasma_origin LIMIT 1),
(SELECT client_id FROM _plasma_origin LIMIT 1),
(SELECT mutation_id FROM _plasma_origin LIMIT 1));
END;

UPDATENEW.* を、DELETEOLD.* + op = 'del' + value = NULL を使う同じ形。Postgres は本体を plpgsql 関数でラップし各トリガーに束ねます。

含意: plasma を経由しない raw driver の書き込みでも _plasma_changes に載り、次回 pull で全クライアントへ配送されます。wrangler d1 execute "INSERT INTO todos ..." がブラウザに反映される理由がこれです。

Raw SQL / 他 migration ツールとの共存

Section titled “Raw SQL / 他 migration ツールとの共存”

seed / 一発分析 / データ移行 / wrangler d1 migrations や drizzle-kit との共存など、runtime 経路以外で DDL を発行したいことがあります。

Raw SQL による user table への書き込みは OK。 AFTER トリガーが _plasma_changes に反映し、次回 pull で全クライアントに届きます。

Raw SQL による _plasma_* 内部テーブルへの書き込みは NG。 de-dup / cursor / blob-ref state を壊します。リセットが必要なら client で resetLocalState を呼び、_plasma_changes は自前で truncate。

Terminal window
# OK — user table
wrangler d1 execute my-app --command "INSERT INTO todos (id, title, done, userId, updatedAt) VALUES ('cron-1', 'Nightly report', 0, 'system', 1735000000000)"
# 危険 — 内部テーブル (`compactChangeLog` を使う方が安全)
wrangler d1 execute my-app --command "DELETE FROM _plasma_changes WHERE row_version < 5000000"

Drizzle から plasma に移行途中で drizzle-kit の migration が残っている場合:

  1. plasma tables に対しては drizzle-kit を退役。 plasma 管理になった table の drizzle-kit migration は削除し runMigrations に任せる。
  2. plasma 対象外 table には drizzle-kit を残す。 admin 用 / analytics / cron 用など plasma が sync しない table なら共存可。

同じ table を両者が管理するのは避けてください。 drizzle-kit の DROP COLUMN / RENAME は plasma の MigrationRefused をバイパスしてクライアントを壊せます。

既存の本番 DB を plasma に取り込む

Section titled “既存の本番 DB を plasma に取り込む”

Drizzle / raw-SQL の既存本番 DB を plasma 化する手順:

  1. plasma が要求する column を live schema に追加。 id (未 UUID text なら)、必要な file() column、default 値。
  2. live DB に ensureSchema を実行。 _plasma_changes と周辺テーブルが作成され、トリガーが install される。既存 row は _plasma_changes に backfill されない — pull はこれ以降の書き込みから始まる。
  3. オプション: 履歴を _plasma_changes に seed。 time-travel で既存 row も見せたい場合:
    INSERT INTO _plasma_changes (row_version, table_name, row_id, op, value, created_at)
    SELECT row_number() OVER () AS row_version,
    'todos', id, 'put', json_object('id', id, 'title', title, ...),
    extract(epoch from now()) * 1000
    FROM todos;
  4. plasma Worker を deploy。 ここからクライアントの pull が始まります。
  • Postgres: user table + トリガー方向ごとに 1 つの plpgsql 関数 (_plasma_trg_<table>_insert_fn 等) を作成し、CREATE OR REPLACE TRIGGER でバインド。CREATE OR REPLACE FUNCTION なので再生成も安全。
  • : _plasma_changes.row_version は Postgres で BIGSERIAL、SQLite で INTEGER PRIMARY KEY AUTOINCREMENT。両方とも単調 id。
  • タイムスタンプ: created_at は両方 BIGINT (unix ms)。方言間で揃えるために TIMESTAMPTZ は使わない。
  • JSON: user の json() column は Postgres で JSONB、SQLite で TEXT

Deploy は D1 と同じ — admin route or deploy hook から runMigrations を呼びます。Hyperdrive がコネクションを透過的に前段。

Refused 変更のゼロダウンタイムパターン

Section titled “Refused 変更のゼロダウンタイムパターン”

runMigrations が drop / rename / kind 変更を拒むので、deploy 期間中は両方の shape を保つ 3 段階の移行が定石。

  1. Deploy 1: runMigrations で新 column を追加 (heading nullable)。管理スクリプトで heading = title に backfill。mutators を両 column 書き込みに更新。読み取りが新名を要求するなら SCHEMA_VERSION を bump。旧クライアントは引き続き title を読める。
  2. Deploy 2: アプリコードの読み取りを heading に切替。両 column への書き込みは継続。
  3. Deploy 3: title への書き込みを停止。全 live クライアントの update 済みを確認後、wrangler d1 execute "ALTER TABLE todos DROP COLUMN title" を実行。schema.ts から title を削り SCHEMA_VERSION をもう 1 度 bump。

新 column で同じパターン:

  • Deploy 1: todoIdBig: bigint().nullable() を追加、SELECT id, ..., CAST(idInt AS BIGINT) AS todoIdBig FROM todos で backfill。
  • Deploy 2: todoIdBig から読取り、両方に書き込み。
  • Deploy 3: raw SQL で idInt を drop、schema から削除。

新 table を作成 → dual write → 読取り切替 → 旧 drop。

パターンは全部同じ: 前進追加 → backfill → dual-write → 切替 → 旧削除。drop は plasma がやらないので、依存クライアントが居ないと自信を持てたところで手動で。

runMigrations は複数 statement 越しにトランザクショナルではありません。途中で失敗した場合 (network 断など)、column が一部追加、トリガー一部再生成、他は未処理 — の partial state になり得ます。

  1. Idempotency が助けになる。 各 step は IF NOT EXISTS / CREATE OR REPLACE。再度 runMigrations を呼べば止まった箇所から拾って同じ最終状態に到達します。
  2. ensureSchema を最初に、runMigrations を後で。 deploy でも request path で ensureSchema を呼び続ける方が良い。partial な runMigrations_plasma_blobs を消しても、次回 ensureSchema が復元します。
  3. Column 追加の自動 rollback は無い。 ALTER TABLE ADD COLUMN は片道。失敗と判断したら、旧クライアントの依存が切れた後に手動 DDL で drop (前述の 3 段階パターン)。

「migration が prod を壊した」場合の対応: 前の Worker deploy に fall back。古い SCHEMA_VERSION と古い handler が戻ります。DB に残った追加 column は無害 — 古いコードは参照しないだけ。

複数インスタンス / マルチリージョンの migration 順序

Section titled “複数インスタンス / マルチリージョンの migration 順序”

同じ D1 (or Hyperdrive 経由の同じ Postgres) の背後に N Worker インスタンスがある場合、request 経路の ensureSchema は毎リクエスト・毎インスタンスで走ります。並行性は安全 (全 DDL が IF NOT EXISTS) ですが無駄。

推奨:

  • ensureSchema は fetch handler で。 cold start ごとの catalogue query 数個 — 有界コスト。
  • runMigrations は 1 つの admin route から。 deploy ごとに CI から 1 回。実際の ALTER TABLE はここでだけ走る。

Multi-region (SequencerDO) では、新 column を要求するコードが deploy される 前に schema migration を通します。sequencer は user schema に非依存なので触らなくて OK。

migration をローカルでテストする

Section titled “migration をローカルでテストする”

runMigrations は任意の SqlExecutor に対して実行できます — テスト用に better-sqlite3 のインメモリ DB を指すことができます:

import BetterSqlite3 from "better-sqlite3"
import { fromBetterSqlite3, runMigrations, ensureSchema } from "@sh1n4ps/plasma-server"
const sqlite = new BetterSqlite3(":memory:")
const executor = fromBetterSqlite3(sqlite)
await ensureSchema({ schema: v1Schema, executor })
// いくつかの行を挿入...
await runMigrations({ schema: v2Schema, executor })
// 追加された column が存在し、データが失われていないことをアサート。

fromBetterSqlite3 は本物の SqlExecutor 実装なので、両エンジンで同一の mutation セマンティクスがテストを通じて保たれます。