sc-dev-deploy/test/managedRowsGate.js
2026-06-01 16:43:43 -05:00

255 lines
11 KiB
JavaScript

// Phase 2 gate: the MANAGED-ROWS (row-data sync) feature on multi-tenant
// Postgres. Proves the rowIdentity PG path + per-tenant isolation + cross-tenant
// row sync end to end:
// 1. mark a table managed on tenant t1 -> rowIdentity.ensureManagedSchema adds
// the hidden _dd_row_uuid column to t1's tenant-schema table and backfills
// existing rows with UUIDs (the SQLite-vs-PG portable column path), and the
// initial ship journals a set_table_mode + one insert_row op per row;
// 2. ISOLATION: sibling tenant t2 gets no _dd_table_modes row for that table;
// 3. SYNC: promote t1 -> t2 (tenant-to-tenant peering, Phase 1) -> t2 applies
// the ops, creating the table with the SAME _dd_row_uuid values (stable
// cross-environment row identity) -- the apply-on-PG path.
//
// HTTP drives the feature; psql introspects each tenant's schema (the _dd_row_uuid
// column + row UUIDs aren't observable over HTTP). Self-skips if PG :3002 or psql
// is unavailable. Run: node test/managedRowsGate.js
const http = require("http");
const net = require("net");
const { execFileSync } = require("child_process");
const PG_PORT = 3002;
const ADMIN_PW = "AdminP@ss1";
const PGENV = Object.assign({}, process.env, {
PGHOST: "/var/run/postgresql",
PGUSER: "scott",
PGDATABASE: "saltcorn_idp",
PGPASSWORD: "peer"
});
const thost = (t) => t + ".localhost.localdomain:" + PG_PORT;
const jars = { t1: {}, t2: {} };
let pass = 0;
let fail = 0;
const ok = (cond, msg) => {
if (cond) {
pass++;
console.log(" PASS " + msg);
} else {
fail++;
console.log(" FAIL " + msg);
}
};
const portOpen = (port) => {
return new Promise((resolve) => {
const s = net.connect(port, "127.0.0.1");
const done = (up) => { s.destroy(); resolve(up); };
s.setTimeout(1000);
s.on("connect", () => done(true));
s.on("timeout", () => done(false));
s.on("error", () => done(false));
});
};
const psql = (sql) => execFileSync("psql", ["-tAqc", sql], { env: PGENV }).toString().trim();
const psqlLines = (sql) => psql(sql).split("\n").map((s) => s.trim()).filter(Boolean);
const storeCookies = (t, headers) => {
const sc = headers["set-cookie"];
if (!sc) {
return;
}
for (const line of sc) {
const pair = line.split(";")[0];
const eq = pair.indexOf("=");
if (eq > 0) {
jars[t][pair.slice(0, eq).trim()] = pair.slice(eq + 1).trim();
}
}
};
const request = (t, method, path, opts) => {
const options = opts || {};
return new Promise((resolve, reject) => {
const headers = Object.assign({ Host: thost(t) }, options.headers || {});
const jar = jars[t];
if (Object.keys(jar).length > 0) {
headers["Cookie"] = Object.keys(jar).map((k) => k + "=" + jar[k]).join("; ");
}
let data = null;
if (options.body) {
data = new URLSearchParams(options.body).toString();
headers["Content-Type"] = "application/x-www-form-urlencoded";
headers["Content-Length"] = Buffer.byteLength(data);
}
const r = http.request({ host: "127.0.0.1", port: PG_PORT, method: method, path: path, headers: headers }, (resp) => {
storeCookies(t, resp.headers);
let body = "";
resp.on("data", (c) => { body += c; });
resp.on("end", () => resolve({ status: resp.statusCode, headers: resp.headers, loc: resp.headers.location, body: body }));
});
r.on("error", reject);
if (data !== null) {
r.write(data);
}
r.end();
});
};
const csrfOf = (h) => { const m = h.match(/name="_csrf" value="([^"]+)"/); return m ? m[1] : ""; };
const opCountOf = (h) => { const m = h.match(/<th>Ops recorded<\/th><td>(\d+)<\/td>/); return m ? parseInt(m[1], 10) : null; };
const envIdOf = (h) => { const m = h.match(/env_id<\/strong> is <code>([0-9a-f-]{36})<\/code>/); return m ? m[1] : ""; };
const secretOf = (h) => { const m = h.match(/<p class="secret">([0-9a-f]+)<\/p>/); return m ? m[1] : ""; };
const ddCsrf = async (t) => csrfOf((await request(t, "GET", "/admin/dev-deploy/peers")).body);
const authed = async (t) => /true/.test((await request(t, "GET", "/auth/authenticated")).body);
const bootstrap = async (t) => {
const lp = await request(t, "GET", "/auth/login");
await request(t, "POST", "/auth/login", { body: { email: "admin@" + t + ".local", password: ADMIN_PW, _csrf: csrfOf(lp.body) } });
if (await authed(t)) {
return true;
}
const cp = await request(t, "GET", "/auth/create_first_user");
const cc = csrfOf(cp.body);
if (cc) {
await request(t, "POST", "/auth/create_first_user", { body: { email: "admin@" + t + ".local", password: ADMIN_PW, default_language: "en", _csrf: cc } });
}
return await authed(t);
};
const peerIdFor = (html, envId) => {
for (const row of html.split("<tr>")) {
if (row.indexOf("<code>" + envId + "</code>") >= 0) {
const m = row.match(/name="peer_id" value="(\d+)"/);
if (m) {
return m[1];
}
}
}
return null;
};
const clearPeer = async (t, envId) => {
const pid = peerIdFor((await request(t, "GET", "/admin/dev-deploy/peers")).body, envId);
if (pid) {
await request(t, "POST", "/admin/dev-deploy/peers/delete", { body: { peer_id: pid, _csrf: await ddCsrf(t) } });
}
};
const cleanup = (tbl) => {
for (const t of ["t1", "t2"]) {
try { psql(`DROP TABLE IF EXISTS "${t}"."${tbl}"`); } catch (e) { /* ignore */ }
try { psql(`DELETE FROM "${t}"._dd_entity_ids WHERE current_name='${tbl}'`); } catch (e) { /* ignore */ }
try { psql(`DELETE FROM "${t}"._dd_table_modes WHERE table_uuid IN (SELECT uuid FROM "${t}"._dd_entity_ids WHERE current_name='${tbl}')`); } catch (e) { /* ignore */ }
}
};
const run = async (tbl) => {
ok(await bootstrap("t1"), "t1 admin session");
ok(await bootstrap("t2"), "t2 admin session");
// Create the table on t1 only; seed two rows directly (the wrap journals the
// initial ship from these existing rows when we mark the table managed).
const c1 = await ddCsrf("t1");
const cr = await request("t1", "POST", "/table", { body: { name: tbl, _csrf: c1 } });
ok(cr.status >= 300 && cr.status < 400 && /\/table\/\d+/.test(cr.loc || ""), "t1 created table " + tbl + " (HTTP " + cr.status + ")");
psql(`INSERT INTO "t1"."${tbl}" DEFAULT VALUES`);
psql(`INSERT INTO "t1"."${tbl}" DEFAULT VALUES`);
ok(psql(`SELECT count(*) FROM "t1"."${tbl}"`) === "2", "t1 seeded 2 rows");
const tblUuid = psql(`SELECT uuid FROM "t1"._dd_entity_ids WHERE kind='table' AND current_name='${tbl}'`);
ok(/^[0-9a-f-]{36}$/.test(tblUuid), "t1 tracked the table (uuid " + tblUuid + ")");
const t1Ops0 = opCountOf((await request("t1", "GET", "/admin/dev-deploy/")).body);
const t2Ops0 = opCountOf((await request("t2", "GET", "/admin/dev-deploy/")).body);
// Mark managed on t1.
const setr = await request("t1", "POST", "/admin/dev-deploy/tables/set", { body: { table_uuid: tblUuid, data_mode: "managed", _csrf: await ddCsrf("t1") } });
const setMsg = decodeURIComponent(setr.loc || "");
ok(setr.status >= 300 && setr.status < 400 && /managed/.test(setMsg) && !/err=/.test(setMsg), "t1 marked table managed (" + setMsg.replace(/^.*\?/, "") + ")");
// rowIdentity PG path: column added + every existing row backfilled.
const colT1 = psql(`SELECT count(*) FROM information_schema.columns WHERE table_schema='t1' AND table_name='${tbl}' AND column_name='_dd_row_uuid'`);
ok(colT1 === "1", "t1 table gained the _dd_row_uuid column (PG ensureManagedSchema)");
ok(psql(`SELECT count(*) FROM "t1"."${tbl}" WHERE _dd_row_uuid IS NOT NULL`) === "2", "t1 backfilled both rows with UUIDs");
// Initial ship journaled: set_table_mode + 2 insert_row = 3 ops.
const t1Ops1 = opCountOf((await request("t1", "GET", "/admin/dev-deploy/")).body);
ok(t1Ops1 !== null && t1Ops1 >= t1Ops0 + 3, "t1 journaled the managed ship (" + t1Ops0 + " -> " + t1Ops1 + ", +>=3)");
// ISOLATION: marking managed on t1 did not touch t2's _dd_table_modes.
ok(psql(`SELECT count(*) FROM "t2"._dd_table_modes WHERE table_uuid='${tblUuid}'`) === "0", "t2 has NO _dd_table_modes row for the table (isolation)");
ok(t2Ops0 === opCountOf((await request("t2", "GET", "/admin/dev-deploy/")).body), "t2 journal unchanged by t1's managed ship");
// SYNC: pair t1 -> t2 and promote. t2 applies create_table + set_table_mode +
// insert_row, recreating the table with the SAME row UUIDs.
const t1Env = envIdOf((await request("t1", "GET", "/admin/dev-deploy/peers")).body);
const t2Env = envIdOf((await request("t2", "GET", "/admin/dev-deploy/peers")).body);
await clearPeer("t1", t2Env);
await clearPeer("t2", t1Env);
const addRes = await request("t1", "POST", "/admin/dev-deploy/peers/add", { body: { env_id: t2Env, label: "t2", base_url: "http://" + thost("t2"), _csrf: await ddCsrf("t1") } });
const secret = secretOf(addRes.body);
ok(/^[0-9a-f]{64}$/.test(secret), "t1 paired with t2");
await request("t2", "POST", "/admin/dev-deploy/peers/add", { body: { env_id: t1Env, label: "t1", base_url: "http://" + thost("t1"), existing_secret: secret, _csrf: await ddCsrf("t2") } });
const t2PeerId = peerIdFor((await request("t1", "GET", "/admin/dev-deploy/peers")).body, t2Env);
const prom = await request("t1", "POST", "/admin/dev-deploy/promote", { body: { peer_id: t2PeerId, _csrf: await ddCsrf("t1") } });
ok(prom.status >= 200 && prom.status < 400, "t1 promote -> t2 (HTTP " + prom.status + ")");
// t2 now has the table, managed, with the SAME _dd_row_uuid values as t1.
const colT2 = psql(`SELECT count(*) FROM information_schema.columns WHERE table_schema='t2' AND table_name='${tbl}' AND column_name='_dd_row_uuid'`);
ok(colT2 === "1", "t2 received the table WITH the _dd_row_uuid column (apply on PG)");
ok(psql(`SELECT count(*) FROM "t2"._dd_table_modes m JOIN "t2"._dd_entity_ids e ON e.uuid=m.table_uuid WHERE e.current_name='${tbl}' AND m.data_mode='managed'`) === "1", "t2's table is now data_mode=managed");
const u1 = psqlLines(`SELECT _dd_row_uuid FROM "t1"."${tbl}" ORDER BY _dd_row_uuid`);
const u2 = psqlLines(`SELECT _dd_row_uuid FROM "t2"."${tbl}" ORDER BY _dd_row_uuid`);
ok(u2.length === 2 && JSON.stringify(u1) === JSON.stringify(u2), "t2 rows carry the SAME UUIDs as t1 (stable cross-tenant row identity)");
// Cleanup peers + tables.
await clearPeer("t1", t2Env);
await clearPeer("t2", t1Env);
cleanup(tbl);
console.log("\nRESULT: " + pass + " passed, " + fail + " failed");
process.exit(fail ? 1 : 0);
};
const main = async () => {
if (!(await portOpen(PG_PORT))) {
console.log("SKIP: Postgres multi-tenant instance not reachable on 127.0.0.1:" + PG_PORT);
process.exit(0);
}
try {
execFileSync("psql", ["-tAqc", "SELECT 1"], { env: PGENV });
} catch (e) {
console.log("SKIP: psql/Postgres not available for schema introspection");
process.exit(0);
}
const tbl = "mr" + Date.now();
try {
await run(tbl);
} catch (e) {
cleanup(tbl);
throw e;
}
};
main().catch((e) => {
console.error("MANAGED-ROWS GATE ERROR:", e && (e.stack || e.message || e));
process.exit(2);
});