// Phase 2 gate: the MANAGED-ROWS (row-data sync) feature on multi-tenant // Postgres. Proves the rowIdentity PG path + per-tenant isolation + cross-tenant // row sync end to end: // 1. mark a table managed on tenant t1 -> rowIdentity.ensureManagedSchema adds // the hidden _dd_row_uuid column to t1's tenant-schema table and backfills // existing rows with UUIDs (the SQLite-vs-PG portable column path), and the // initial ship journals a set_table_mode + one insert_row op per row; // 2. ISOLATION: sibling tenant t2 gets no _dd_table_modes row for that table; // 3. SYNC: promote t1 -> t2 (tenant-to-tenant peering, Phase 1) -> t2 applies // the ops, creating the table with the SAME _dd_row_uuid values (stable // cross-environment row identity) -- the apply-on-PG path. // // HTTP drives the feature; psql introspects each tenant's schema (the _dd_row_uuid // column + row UUIDs aren't observable over HTTP). Self-skips if PG :3002 or psql // is unavailable. Run: node test/managedRowsGate.js const http = require("http"); const net = require("net"); const { execFileSync } = require("child_process"); const PG_PORT = 3002; const ADMIN_PW = "AdminP@ss1"; const PGENV = Object.assign({}, process.env, { PGHOST: "/var/run/postgresql", PGUSER: "scott", PGDATABASE: "saltcorn_idp", PGPASSWORD: "peer" }); const thost = (t) => t + ".localhost.localdomain:" + PG_PORT; const jars = { t1: {}, t2: {} }; let pass = 0; let fail = 0; const ok = (cond, msg) => { if (cond) { pass++; console.log(" PASS " + msg); } else { fail++; console.log(" FAIL " + msg); } }; const portOpen = (port) => { return new Promise((resolve) => { const s = net.connect(port, "127.0.0.1"); const done = (up) => { s.destroy(); resolve(up); }; s.setTimeout(1000); s.on("connect", () => done(true)); s.on("timeout", () => done(false)); s.on("error", () => done(false)); }); }; const psql = (sql) => execFileSync("psql", ["-tAqc", sql], { env: PGENV }).toString().trim(); const psqlLines = (sql) => psql(sql).split("\n").map((s) => s.trim()).filter(Boolean); const storeCookies = (t, headers) => { const sc = headers["set-cookie"]; if (!sc) { return; } for (const line of sc) { const pair = line.split(";")[0]; const eq = pair.indexOf("="); if (eq > 0) { jars[t][pair.slice(0, eq).trim()] = pair.slice(eq + 1).trim(); } } }; const request = (t, method, path, opts) => { const options = opts || {}; return new Promise((resolve, reject) => { const headers = Object.assign({ Host: thost(t) }, options.headers || {}); const jar = jars[t]; if (Object.keys(jar).length > 0) { headers["Cookie"] = Object.keys(jar).map((k) => k + "=" + jar[k]).join("; "); } let data = null; if (options.body) { data = new URLSearchParams(options.body).toString(); headers["Content-Type"] = "application/x-www-form-urlencoded"; headers["Content-Length"] = Buffer.byteLength(data); } const r = http.request({ host: "127.0.0.1", port: PG_PORT, method: method, path: path, headers: headers }, (resp) => { storeCookies(t, resp.headers); let body = ""; resp.on("data", (c) => { body += c; }); resp.on("end", () => resolve({ status: resp.statusCode, headers: resp.headers, loc: resp.headers.location, body: body })); }); r.on("error", reject); if (data !== null) { r.write(data); } r.end(); }); }; const csrfOf = (h) => { const m = h.match(/name="_csrf" value="([^"]+)"/); return m ? m[1] : ""; }; const opCountOf = (h) => { const m = h.match(/Ops recorded<\/th>(\d+)<\/td>/); return m ? parseInt(m[1], 10) : null; }; const envIdOf = (h) => { const m = h.match(/env_id<\/strong> is ([0-9a-f-]{36})<\/code>/); return m ? m[1] : ""; }; const secretOf = (h) => { const m = h.match(/

([0-9a-f]+)<\/p>/); return m ? m[1] : ""; }; const ddCsrf = async (t) => csrfOf((await request(t, "GET", "/admin/dev-deploy/peers")).body); const authed = async (t) => /true/.test((await request(t, "GET", "/auth/authenticated")).body); const bootstrap = async (t) => { const lp = await request(t, "GET", "/auth/login"); await request(t, "POST", "/auth/login", { body: { email: "admin@" + t + ".local", password: ADMIN_PW, _csrf: csrfOf(lp.body) } }); if (await authed(t)) { return true; } const cp = await request(t, "GET", "/auth/create_first_user"); const cc = csrfOf(cp.body); if (cc) { await request(t, "POST", "/auth/create_first_user", { body: { email: "admin@" + t + ".local", password: ADMIN_PW, default_language: "en", _csrf: cc } }); } return await authed(t); }; const peerIdFor = (html, envId) => { for (const row of html.split("")) { if (row.indexOf("" + envId + "") >= 0) { const m = row.match(/name="peer_id" value="(\d+)"/); if (m) { return m[1]; } } } return null; }; const clearPeer = async (t, envId) => { const pid = peerIdFor((await request(t, "GET", "/admin/dev-deploy/peers")).body, envId); if (pid) { await request(t, "POST", "/admin/dev-deploy/peers/delete", { body: { peer_id: pid, _csrf: await ddCsrf(t) } }); } }; const cleanup = (tbl) => { for (const t of ["t1", "t2"]) { try { psql(`DROP TABLE IF EXISTS "${t}"."${tbl}"`); } catch (e) { /* ignore */ } try { psql(`DELETE FROM "${t}"._dd_entity_ids WHERE current_name='${tbl}'`); } catch (e) { /* ignore */ } try { psql(`DELETE FROM "${t}"._dd_table_modes WHERE table_uuid IN (SELECT uuid FROM "${t}"._dd_entity_ids WHERE current_name='${tbl}')`); } catch (e) { /* ignore */ } } }; const run = async (tbl) => { ok(await bootstrap("t1"), "t1 admin session"); ok(await bootstrap("t2"), "t2 admin session"); // Create the table on t1 only; seed two rows directly (the wrap journals the // initial ship from these existing rows when we mark the table managed). const c1 = await ddCsrf("t1"); const cr = await request("t1", "POST", "/table", { body: { name: tbl, _csrf: c1 } }); ok(cr.status >= 300 && cr.status < 400 && /\/table\/\d+/.test(cr.loc || ""), "t1 created table " + tbl + " (HTTP " + cr.status + ")"); psql(`INSERT INTO "t1"."${tbl}" DEFAULT VALUES`); psql(`INSERT INTO "t1"."${tbl}" DEFAULT VALUES`); ok(psql(`SELECT count(*) FROM "t1"."${tbl}"`) === "2", "t1 seeded 2 rows"); const tblUuid = psql(`SELECT uuid FROM "t1"._dd_entity_ids WHERE kind='table' AND current_name='${tbl}'`); ok(/^[0-9a-f-]{36}$/.test(tblUuid), "t1 tracked the table (uuid " + tblUuid + ")"); const t1Ops0 = opCountOf((await request("t1", "GET", "/admin/dev-deploy/")).body); const t2Ops0 = opCountOf((await request("t2", "GET", "/admin/dev-deploy/")).body); // Mark managed on t1. const setr = await request("t1", "POST", "/admin/dev-deploy/tables/set", { body: { table_uuid: tblUuid, data_mode: "managed", _csrf: await ddCsrf("t1") } }); const setMsg = decodeURIComponent(setr.loc || ""); ok(setr.status >= 300 && setr.status < 400 && /managed/.test(setMsg) && !/err=/.test(setMsg), "t1 marked table managed (" + setMsg.replace(/^.*\?/, "") + ")"); // rowIdentity PG path: column added + every existing row backfilled. const colT1 = psql(`SELECT count(*) FROM information_schema.columns WHERE table_schema='t1' AND table_name='${tbl}' AND column_name='_dd_row_uuid'`); ok(colT1 === "1", "t1 table gained the _dd_row_uuid column (PG ensureManagedSchema)"); ok(psql(`SELECT count(*) FROM "t1"."${tbl}" WHERE _dd_row_uuid IS NOT NULL`) === "2", "t1 backfilled both rows with UUIDs"); // Initial ship journaled: set_table_mode + 2 insert_row = 3 ops. const t1Ops1 = opCountOf((await request("t1", "GET", "/admin/dev-deploy/")).body); ok(t1Ops1 !== null && t1Ops1 >= t1Ops0 + 3, "t1 journaled the managed ship (" + t1Ops0 + " -> " + t1Ops1 + ", +>=3)"); // ISOLATION: marking managed on t1 did not touch t2's _dd_table_modes. ok(psql(`SELECT count(*) FROM "t2"._dd_table_modes WHERE table_uuid='${tblUuid}'`) === "0", "t2 has NO _dd_table_modes row for the table (isolation)"); ok(t2Ops0 === opCountOf((await request("t2", "GET", "/admin/dev-deploy/")).body), "t2 journal unchanged by t1's managed ship"); // SYNC: pair t1 -> t2 and promote. t2 applies create_table + set_table_mode + // insert_row, recreating the table with the SAME row UUIDs. const t1Env = envIdOf((await request("t1", "GET", "/admin/dev-deploy/peers")).body); const t2Env = envIdOf((await request("t2", "GET", "/admin/dev-deploy/peers")).body); await clearPeer("t1", t2Env); await clearPeer("t2", t1Env); const addRes = await request("t1", "POST", "/admin/dev-deploy/peers/add", { body: { env_id: t2Env, label: "t2", base_url: "http://" + thost("t2"), _csrf: await ddCsrf("t1") } }); const secret = secretOf(addRes.body); ok(/^[0-9a-f]{64}$/.test(secret), "t1 paired with t2"); await request("t2", "POST", "/admin/dev-deploy/peers/add", { body: { env_id: t1Env, label: "t1", base_url: "http://" + thost("t1"), existing_secret: secret, _csrf: await ddCsrf("t2") } }); const t2PeerId = peerIdFor((await request("t1", "GET", "/admin/dev-deploy/peers")).body, t2Env); const prom = await request("t1", "POST", "/admin/dev-deploy/promote", { body: { peer_id: t2PeerId, _csrf: await ddCsrf("t1") } }); ok(prom.status >= 200 && prom.status < 400, "t1 promote -> t2 (HTTP " + prom.status + ")"); // t2 now has the table, managed, with the SAME _dd_row_uuid values as t1. const colT2 = psql(`SELECT count(*) FROM information_schema.columns WHERE table_schema='t2' AND table_name='${tbl}' AND column_name='_dd_row_uuid'`); ok(colT2 === "1", "t2 received the table WITH the _dd_row_uuid column (apply on PG)"); ok(psql(`SELECT count(*) FROM "t2"._dd_table_modes m JOIN "t2"._dd_entity_ids e ON e.uuid=m.table_uuid WHERE e.current_name='${tbl}' AND m.data_mode='managed'`) === "1", "t2's table is now data_mode=managed"); const u1 = psqlLines(`SELECT _dd_row_uuid FROM "t1"."${tbl}" ORDER BY _dd_row_uuid`); const u2 = psqlLines(`SELECT _dd_row_uuid FROM "t2"."${tbl}" ORDER BY _dd_row_uuid`); ok(u2.length === 2 && JSON.stringify(u1) === JSON.stringify(u2), "t2 rows carry the SAME UUIDs as t1 (stable cross-tenant row identity)"); // Cleanup peers + tables. await clearPeer("t1", t2Env); await clearPeer("t2", t1Env); cleanup(tbl); console.log("\nRESULT: " + pass + " passed, " + fail + " failed"); process.exit(fail ? 1 : 0); }; const main = async () => { if (!(await portOpen(PG_PORT))) { console.log("SKIP: Postgres multi-tenant instance not reachable on 127.0.0.1:" + PG_PORT); process.exit(0); } try { execFileSync("psql", ["-tAqc", "SELECT 1"], { env: PGENV }); } catch (e) { console.log("SKIP: psql/Postgres not available for schema introspection"); process.exit(0); } const tbl = "mr" + Date.now(); try { await run(tbl); } catch (e) { cleanup(tbl); throw e; } }; main().catch((e) => { console.error("MANAGED-ROWS GATE ERROR:", e && (e.stack || e.message || e)); process.exit(2); });