// Phase 2 gate: the MANAGED-ROWS (row-data sync) feature on multi-tenant // Postgres. Proves the rowIdentity PG path + per-tenant isolation + cross-tenant // row sync end to end: // 1. mark a table managed on tenant t1 -> rowIdentity.ensureManagedSchema adds // the hidden _dd_row_uuid column to t1's tenant-schema table and backfills // existing rows with UUIDs (the SQLite-vs-PG portable column path), and the // initial ship journals a set_table_mode + one insert_row op per row; // 2. ISOLATION: sibling tenant t2 gets no _dd_table_modes row for that table; // 3. SYNC: promote t1 -> t2 (tenant-to-tenant peering, Phase 1) -> t2 applies // the ops, creating the table with the SAME _dd_row_uuid values (stable // cross-environment row identity) -- the apply-on-PG path. // // HTTP drives the feature; psql introspects each tenant's schema (the _dd_row_uuid // column + row UUIDs aren't observable over HTTP). Self-skips if PG :3002 or psql // is unavailable. Run: node test/managedRowsGate.js const http = require("http"); const net = require("net"); const { execFileSync } = require("child_process"); const PG_PORT = 3002; const ADMIN_PW = "AdminP@ss1"; const PGENV = Object.assign({}, process.env, { PGHOST: "/var/run/postgresql", PGUSER: "scott", PGDATABASE: "saltcorn_idp", PGPASSWORD: "peer" }); const thost = (t) => t + ".localhost.localdomain:" + PG_PORT; const jars = { t1: {}, t2: {} }; let pass = 0; let fail = 0; const ok = (cond, msg) => { if (cond) { pass++; console.log(" PASS " + msg); } else { fail++; console.log(" FAIL " + msg); } }; const portOpen = (port) => { return new Promise((resolve) => { const s = net.connect(port, "127.0.0.1"); const done = (up) => { s.destroy(); resolve(up); }; s.setTimeout(1000); s.on("connect", () => done(true)); s.on("timeout", () => done(false)); s.on("error", () => done(false)); }); }; const psql = (sql) => execFileSync("psql", ["-tAqc", sql], { env: PGENV }).toString().trim(); const psqlLines = (sql) => psql(sql).split("\n").map((s) => s.trim()).filter(Boolean); const storeCookies = (t, headers) => { const sc = headers["set-cookie"]; if (!sc) { return; } for (const line of sc) { const pair = line.split(";")[0]; const eq = pair.indexOf("="); if (eq > 0) { jars[t][pair.slice(0, eq).trim()] = pair.slice(eq + 1).trim(); } } }; const request = (t, method, path, opts) => { const options = opts || {}; return new Promise((resolve, reject) => { const headers = Object.assign({ Host: thost(t) }, options.headers || {}); const jar = jars[t]; if (Object.keys(jar).length > 0) { headers["Cookie"] = Object.keys(jar).map((k) => k + "=" + jar[k]).join("; "); } let data = null; if (options.body) { data = new URLSearchParams(options.body).toString(); headers["Content-Type"] = "application/x-www-form-urlencoded"; headers["Content-Length"] = Buffer.byteLength(data); } const r = http.request({ host: "127.0.0.1", port: PG_PORT, method: method, path: path, headers: headers }, (resp) => { storeCookies(t, resp.headers); let body = ""; resp.on("data", (c) => { body += c; }); resp.on("end", () => resolve({ status: resp.statusCode, headers: resp.headers, loc: resp.headers.location, body: body })); }); r.on("error", reject); if (data !== null) { r.write(data); } r.end(); }); }; const csrfOf = (h) => { const m = h.match(/name="_csrf" value="([^"]+)"/); return m ? m[1] : ""; }; const opCountOf = (h) => { const m = h.match(/
([0-9a-f-]{36})<\/code>/); return m ? m[1] : ""; };
const secretOf = (h) => { const m = h.match(/([0-9a-f]+)<\/p>/); return m ? m[1] : ""; };
const ddCsrf = async (t) => csrfOf((await request(t, "GET", "/admin/dev-deploy/peers")).body);
const authed = async (t) => /true/.test((await request(t, "GET", "/auth/authenticated")).body);
const bootstrap = async (t) => {
const lp = await request(t, "GET", "/auth/login");
await request(t, "POST", "/auth/login", { body: { email: "admin@" + t + ".local", password: ADMIN_PW, _csrf: csrfOf(lp.body) } });
if (await authed(t)) {
return true;
}
const cp = await request(t, "GET", "/auth/create_first_user");
const cc = csrfOf(cp.body);
if (cc) {
await request(t, "POST", "/auth/create_first_user", { body: { email: "admin@" + t + ".local", password: ADMIN_PW, default_language: "en", _csrf: cc } });
}
return await authed(t);
};
const peerIdFor = (html, envId) => {
for (const row of html.split("
")) {
if (row.indexOf("" + envId + "") >= 0) {
const m = row.match(/name="peer_id" value="(\d+)"/);
if (m) {
return m[1];
}
}
}
return null;
};
const clearPeer = async (t, envId) => {
const pid = peerIdFor((await request(t, "GET", "/admin/dev-deploy/peers")).body, envId);
if (pid) {
await request(t, "POST", "/admin/dev-deploy/peers/delete", { body: { peer_id: pid, _csrf: await ddCsrf(t) } });
}
};
const cleanup = (tbl) => {
for (const t of ["t1", "t2"]) {
try { psql(`DROP TABLE IF EXISTS "${t}"."${tbl}"`); } catch (e) { /* ignore */ }
try { psql(`DELETE FROM "${t}"._dd_entity_ids WHERE current_name='${tbl}'`); } catch (e) { /* ignore */ }
try { psql(`DELETE FROM "${t}"._dd_table_modes WHERE table_uuid IN (SELECT uuid FROM "${t}"._dd_entity_ids WHERE current_name='${tbl}')`); } catch (e) { /* ignore */ }
}
};
const run = async (tbl) => {
ok(await bootstrap("t1"), "t1 admin session");
ok(await bootstrap("t2"), "t2 admin session");
// Create the table on t1 only; seed two rows directly (the wrap journals the
// initial ship from these existing rows when we mark the table managed).
const c1 = await ddCsrf("t1");
const cr = await request("t1", "POST", "/table", { body: { name: tbl, _csrf: c1 } });
ok(cr.status >= 300 && cr.status < 400 && /\/table\/\d+/.test(cr.loc || ""), "t1 created table " + tbl + " (HTTP " + cr.status + ")");
psql(`INSERT INTO "t1"."${tbl}" DEFAULT VALUES`);
psql(`INSERT INTO "t1"."${tbl}" DEFAULT VALUES`);
ok(psql(`SELECT count(*) FROM "t1"."${tbl}"`) === "2", "t1 seeded 2 rows");
const tblUuid = psql(`SELECT uuid FROM "t1"._dd_entity_ids WHERE kind='table' AND current_name='${tbl}'`);
ok(/^[0-9a-f-]{36}$/.test(tblUuid), "t1 tracked the table (uuid " + tblUuid + ")");
const t1Ops0 = opCountOf((await request("t1", "GET", "/admin/dev-deploy/")).body);
const t2Ops0 = opCountOf((await request("t2", "GET", "/admin/dev-deploy/")).body);
// Mark managed on t1.
const setr = await request("t1", "POST", "/admin/dev-deploy/tables/set", { body: { table_uuid: tblUuid, data_mode: "managed", _csrf: await ddCsrf("t1") } });
const setMsg = decodeURIComponent(setr.loc || "");
ok(setr.status >= 300 && setr.status < 400 && /managed/.test(setMsg) && !/err=/.test(setMsg), "t1 marked table managed (" + setMsg.replace(/^.*\?/, "") + ")");
// rowIdentity PG path: column added + every existing row backfilled.
const colT1 = psql(`SELECT count(*) FROM information_schema.columns WHERE table_schema='t1' AND table_name='${tbl}' AND column_name='_dd_row_uuid'`);
ok(colT1 === "1", "t1 table gained the _dd_row_uuid column (PG ensureManagedSchema)");
ok(psql(`SELECT count(*) FROM "t1"."${tbl}" WHERE _dd_row_uuid IS NOT NULL`) === "2", "t1 backfilled both rows with UUIDs");
// Initial ship journaled: set_table_mode + 2 insert_row = 3 ops.
const t1Ops1 = opCountOf((await request("t1", "GET", "/admin/dev-deploy/")).body);
ok(t1Ops1 !== null && t1Ops1 >= t1Ops0 + 3, "t1 journaled the managed ship (" + t1Ops0 + " -> " + t1Ops1 + ", +>=3)");
// ISOLATION: marking managed on t1 did not touch t2's _dd_table_modes.
ok(psql(`SELECT count(*) FROM "t2"._dd_table_modes WHERE table_uuid='${tblUuid}'`) === "0", "t2 has NO _dd_table_modes row for the table (isolation)");
ok(t2Ops0 === opCountOf((await request("t2", "GET", "/admin/dev-deploy/")).body), "t2 journal unchanged by t1's managed ship");
// SYNC: pair t1 -> t2 and promote. t2 applies create_table + set_table_mode +
// insert_row, recreating the table with the SAME row UUIDs.
const t1Env = envIdOf((await request("t1", "GET", "/admin/dev-deploy/peers")).body);
const t2Env = envIdOf((await request("t2", "GET", "/admin/dev-deploy/peers")).body);
await clearPeer("t1", t2Env);
await clearPeer("t2", t1Env);
const addRes = await request("t1", "POST", "/admin/dev-deploy/peers/add", { body: { env_id: t2Env, label: "t2", base_url: "http://" + thost("t2"), _csrf: await ddCsrf("t1") } });
const secret = secretOf(addRes.body);
ok(/^[0-9a-f]{64}$/.test(secret), "t1 paired with t2");
await request("t2", "POST", "/admin/dev-deploy/peers/add", { body: { env_id: t1Env, label: "t1", base_url: "http://" + thost("t1"), existing_secret: secret, _csrf: await ddCsrf("t2") } });
const t2PeerId = peerIdFor((await request("t1", "GET", "/admin/dev-deploy/peers")).body, t2Env);
const prom = await request("t1", "POST", "/admin/dev-deploy/promote", { body: { peer_id: t2PeerId, _csrf: await ddCsrf("t1") } });
ok(prom.status >= 200 && prom.status < 400, "t1 promote -> t2 (HTTP " + prom.status + ")");
// t2 now has the table, managed, with the SAME _dd_row_uuid values as t1.
const colT2 = psql(`SELECT count(*) FROM information_schema.columns WHERE table_schema='t2' AND table_name='${tbl}' AND column_name='_dd_row_uuid'`);
ok(colT2 === "1", "t2 received the table WITH the _dd_row_uuid column (apply on PG)");
ok(psql(`SELECT count(*) FROM "t2"._dd_table_modes m JOIN "t2"._dd_entity_ids e ON e.uuid=m.table_uuid WHERE e.current_name='${tbl}' AND m.data_mode='managed'`) === "1", "t2's table is now data_mode=managed");
const u1 = psqlLines(`SELECT _dd_row_uuid FROM "t1"."${tbl}" ORDER BY _dd_row_uuid`);
const u2 = psqlLines(`SELECT _dd_row_uuid FROM "t2"."${tbl}" ORDER BY _dd_row_uuid`);
ok(u2.length === 2 && JSON.stringify(u1) === JSON.stringify(u2), "t2 rows carry the SAME UUIDs as t1 (stable cross-tenant row identity)");
// Cleanup peers + tables.
await clearPeer("t1", t2Env);
await clearPeer("t2", t1Env);
cleanup(tbl);
console.log("\nRESULT: " + pass + " passed, " + fail + " failed");
process.exit(fail ? 1 : 0);
};
const main = async () => {
if (!(await portOpen(PG_PORT))) {
console.log("SKIP: Postgres multi-tenant instance not reachable on 127.0.0.1:" + PG_PORT);
process.exit(0);
}
try {
execFileSync("psql", ["-tAqc", "SELECT 1"], { env: PGENV });
} catch (e) {
console.log("SKIP: psql/Postgres not available for schema introspection");
process.exit(0);
}
const tbl = "mr" + Date.now();
try {
await run(tbl);
} catch (e) {
cleanup(tbl);
throw e;
}
};
main().catch((e) => {
console.error("MANAGED-ROWS GATE ERROR:", e && (e.stack || e.message || e));
process.exit(2);
});