sc-dev-deploy/lib/routes.js

1040 lines
51 KiB
JavaScript

// HTTP routes for dev-deploy.
//
// Admin UI (session + admin role):
// GET /admin/dev-deploy/
// GET /admin/dev-deploy/ops
// GET /admin/dev-deploy/peers
// POST /admin/dev-deploy/peers/add
// POST /admin/dev-deploy/peers/rotate
// POST /admin/dev-deploy/peers/delete
// GET /admin/dev-deploy/plan
// POST /admin/dev-deploy/promote
//
// Machine API (HMAC peer auth):
// GET /dev-deploy/api/journal?since=op_id
// POST /dev-deploy/api/ingest
const db = require("@saltcorn/data/db");
const { PLUGIN_NAME, PLUGIN_VERSION } = require("./constants");
const { getEnv } = require("./env");
const peers = require("./peers");
const { requirePeerAuth } = require("./peerAuth");
const { signedFetch } = require("./transport");
const { applyBatch, resolveConflict, resolveConflictByMerge, conflictFieldDiff } = require("./apply");
const { revertOp } = require("./revert");
const { DATA_MODES } = require("./constants");
// Saltcorn native markup primitives -- the same ones core admin pages use, so
// these pages inherit the active theme by construction (mkTable/renderForm/
// post_btn instead of hand-rolled HTML + manual Bootstrap classes).
const { mkTable, post_btn, renderForm, link, alert, badge, tags } = require("@saltcorn/markup");
const { p, code, pre, div, span, strong } = tags;
const Form = require("@saltcorn/data/models/form");
const getInboundAnchor = async (peerId) => {
return await db.selectMaybeOne("_dd_anchors", { peer_id: peerId, direction: "inbound" });
};
const getOutboundAnchor = async (peerId) => {
return await db.selectMaybeOne("_dd_anchors", { peer_id: peerId, direction: "outbound" });
};
const upsertAnchor = async (peerId, direction, opId) => {
const now = new Date().toISOString();
const existing = await db.selectMaybeOne("_dd_anchors", { peer_id: peerId, direction: direction });
if (existing) {
await db.updateWhere("_dd_anchors", { last_op_id: opId, updated_at: now }, { peer_id: peerId, direction: direction });
} else {
await db.insert("_dd_anchors", { peer_id: peerId, direction: direction, last_op_id: opId, updated_at: now }, { noid: true });
}
};
// Safety cap on the number of journal pages a single pull will fetch. The peer's
// apiJournal caps each response at 1000 ops; pull loops until drained or this cap.
const PULL_MAX_ITERS = 100;
const isAdmin = (req) => !!(req && req.user && req.user.role_id === 1);
const escape = (s) => {
if (s === null || s === undefined) return "";
return String(s)
.replace(/&/g, "&")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")
.replace(/"/g, "&quot;");
};
const csrfField = (req) => {
const t = req.csrfToken ? req.csrfToken() : "";
return `<input type="hidden" name="_csrf" value="${escape(t)}">`;
};
// Bootstrap nav-pills with the current page marked active. The active link is the
// one whose href is the LONGEST boundary-safe prefix of the request path, so a
// sub-page (e.g. /conflicts/merge) still highlights its parent tab.
const navPills = (req, links) => {
const cur = String((req && (req.originalUrl || req.path)) || "").split("?")[0].replace(/\/+$/, "");
let activeHref = "";
for (const [href] of links) {
const h = href.replace(/\/+$/, "");
if ((cur === h || cur.startsWith(h + "/")) && h.length > activeHref.length) {
activeHref = h;
}
}
const items = links.map(([href, label]) => {
const on = href.replace(/\/+$/, "") === activeHref;
return ` <li class="nav-item"><a class="nav-link${on ? " active" : ""}"${on ? ' aria-current="page"' : ""} href="${escape(href)}">${escape(label)}</a></li>`;
}).join("\n");
return `<ul class="nav nav-pills mb-3">\n${items}\n</ul>`;
};
const DD_NAV = [
["/admin/dev-deploy/", "Dashboard"],
["/admin/dev-deploy/ops", "Journal"],
["/admin/dev-deploy/peers", "Peers"],
["/admin/dev-deploy/plan", "Plan"],
["/admin/dev-deploy/tables", "Tables"],
["/admin/dev-deploy/conflicts", "Conflicts"],
];
// Render an admin page in the ACTIVE Saltcorn theme: breadcrumbs + sub-nav +
// flash alerts (from ?msg/?err) + the given content segments (cards or strings),
// all via res.sendWrap structured layout. This is the one rendering path; pages
// build their content with mkTable / renderForm / post_btn (no raw page HTML).
const adminPage = (req, res, title, ...segments) => {
const above = [
{ type: "breadcrumbs", crumbs: [{ text: "Settings", href: "/settings" }, { text: "dev-deploy" }, { text: title }] },
navPills(req, DD_NAV),
];
if (req.query.msg) above.push(alert("success", escape(req.query.msg)));
if (req.query.err) above.push(alert("danger", escape(req.query.err)));
res.sendWrap(`dev-deploy ${title}`, { above: above.concat(segments) });
};
// ---------------- Admin dashboard ----------------
const dashboard = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
const env = await getEnv();
const schema = db.getTenantSchemaPrefix();
const opCount = (await db.query(`SELECT COUNT(*) AS c FROM ${schema}_dd_ops`)).rows[0].c;
const opsByKind = (await db.query(`SELECT op_type, COUNT(*) AS c FROM ${schema}_dd_ops GROUP BY op_type ORDER BY op_type`)).rows;
const entCounts = (await db.query(`SELECT kind, COUNT(*) AS c FROM ${schema}_dd_entity_ids GROUP BY kind ORDER BY kind`)).rows;
const peerList = await peers.listPeers();
const conflictCount = (await db.query(`SELECT COUNT(*) AS c FROM ${schema}_dd_ops WHERE status='conflict'`)).rows[0].c;
const envRows = [
{ k: "Env ID", v: code(escape(env ? env.env_id : "?")) },
{ k: "Label", v: env && env.env_label ? escape(env.env_label) : span({ class: "text-muted" }, "(unset)") },
{ k: "Destructive-op policy", v: span({ class: "badge bg-info" }, escape(env ? env.on_destructive_op : "?")) },
{ k: "Require TLS (default)", v: env && env.require_tls ? "yes" : "no" },
{ k: "Bootstrapped at", v: escape(env ? env.bootstrapped_at : "") },
{ k: "Ops recorded", v: escape(opCount) },
{ k: "Peers configured", v: escape(peerList.length) },
{ k: "Pending conflicts", v: conflictCount > 0 ? link("/admin/dev-deploy/conflicts", strong(escape(conflictCount))) : "0" },
];
const kv = mkTable([{ label: "Setting", key: "k" }, { label: "Value", key: (r) => r.v }], envRows);
const countTable = (rows, c1) => rows.length === 0
? p({ class: "text-muted" }, "None")
: mkTable([{ label: c1, key: (r) => escape(r[c1]) }, { label: "count", key: (r) => escape(r.c) }], rows);
adminPage(req, res, "dashboard",
{ type: "card", title: "Environment", contents: kv },
{ type: "card", title: "Ops by type", contents: countTable(opsByKind, "op_type") },
{ type: "card", title: "Entities tracked", contents: countTable(entCounts, "kind") }
);
};
// ---------------- Ops viewer ----------------
const fetchOps = async (limit, since, offset) => {
const schema = db.getTenantSchemaPrefix();
let sql = `SELECT op_id, source_env_id, op_type, entity_kind, entity_uuid, payload, parent_op_id, correlation_id, schema_version, created_at, applied_at, status FROM ${schema}_dd_ops`;
const params = [];
if (since) {
const anchor = (await db.query(`SELECT created_at FROM ${schema}_dd_ops WHERE op_id = $1`, [since])).rows[0];
if (anchor) {
sql += ` WHERE created_at > $${params.length + 1}`;
params.push(anchor.created_at);
}
}
sql += ` ORDER BY created_at DESC LIMIT $${params.length + 1}`;
params.push(limit || 100);
sql += ` OFFSET $${params.length + 1}`;
params.push(offset || 0);
return (await db.query(sql, params)).rows;
};
const opsView = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
const wantJson = (req.headers.accept || "").includes("application/json");
const limit = Math.min(parseInt(req.query.limit || "100", 10) || 100, 500);
const offset = Math.max(parseInt(req.query.offset || "0", 10) || 0, 0);
const since = req.query.since;
const ops = await fetchOps(limit, since, offset);
if (wantJson) { res.json({ ops: ops }); return; }
const opsTable = ops.length === 0
? p({ class: "text-muted" }, "Journal is empty")
: mkTable([
{ label: "op", key: (o) => code(escape(o.op_id.slice(0, 8))) },
{ label: "op_type", key: (o) => escape(o.op_type) },
{ label: "entity", key: (o) => code(escape((o.entity_uuid || "").slice(0, 8))) },
{ label: "parent", key: (o) => code(escape((o.parent_op_id || "").slice(0, 8))) },
{ label: "status", key: (o) => escape(o.status) },
{ label: "created", key: (o) => escape(o.created_at) },
{ label: "actions", key: (o) => post_btn("/admin/dev-deploy/revert", "Revert", req.csrfToken(), { req, btnClass: "btn-danger", small: true, formClass: "d-inline", body: { op_id: o.op_id }, confirm: true }) },
{ label: "payload", key: (o) => pre({ class: "mb-0 p-1 bg-light text-dark", style: "white-space:pre-wrap;word-break:break-all;max-width:32rem" }, escape(o.payload)) },
], ops);
const qs = (off) => {
const parts = [`offset=${off}`, `limit=${limit}`];
if (since) { parts.push(`since=${encodeURIComponent(since)}`); }
return "/admin/dev-deploy/ops?" + parts.join("&");
};
const prevLink = offset > 0 ? link(qs(Math.max(offset - limit, 0)), "&laquo; Prev") : span({ class: "text-muted" }, "&laquo; Prev");
const nextLink = ops.length === limit ? link(qs(offset + limit), "Next &raquo;") : span({ class: "text-muted" }, "Next &raquo;");
adminPage(req, res, "journal",
{ type: "card", title: "Journal", contents:
p({ class: "text-muted" }, `Showing up to ${escape(limit)} ops${since ? `, since op ${code(escape(since.slice(0, 8)))}` : ""}, offset ${escape(offset)}. Newest first. Revert appends a compensating op rather than rewriting history.`) +
opsTable +
p(prevLink, " &nbsp; ", nextLink) }
);
};
// ---------------- Peers ----------------
const peersView = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
const env = await getEnv();
const list = await peers.listPeers();
const actBtn = (peer, url, label, cls) =>
post_btn(url, label, req.csrfToken(), { req, btnClass: cls, small: true, formClass: "d-inline me-1", body: { peer_id: peer.peer_id } });
const peerTable = mkTable([
{ label: "id", key: "peer_id" },
{ label: "label", key: (peer) => escape(peer.label || "(unset)") },
{ label: "env_id", key: (peer) => code(escape(peer.env_id)) },
{ label: "base_url", key: (peer) => escape(peer.base_url) },
{ label: "last seen", key: (peer) => escape(peer.last_seen_at || "never") },
{ label: "actions", key: (peer) =>
actBtn(peer, "/admin/dev-deploy/promote", "Promote", "btn-primary") +
actBtn(peer, "/admin/dev-deploy/pull", "Pull", "btn-secondary") +
actBtn(peer, "/admin/dev-deploy/peers/rotate", "Rotate", "btn-warning") +
post_btn("/admin/dev-deploy/peers/delete", "Delete", req.csrfToken(), { req, btnClass: "btn-danger", small: true, formClass: "d-inline", body: { peer_id: peer.peer_id }, confirm: true }),
},
], list);
const addForm = new Form({
action: "/admin/dev-deploy/peers/add",
submitLabel: "Pair",
blurb: "Leave the shared secret blank to generate one (shown once after submit). Paste the same secret in the peer's own pairing form.",
fields: [
{ name: "env_id", label: "Peer env_id", type: "String", required: true },
{ name: "label", label: "Label", type: "String", attributes: { placeholder: "test, prod, etc." } },
{ name: "base_url", label: "Base URL", type: "String", required: true, attributes: { placeholder: "http://localhost:3001" } },
{ name: "require_tls", label: "Require TLS", type: "Bool" },
{ name: "existing_secret", label: "Existing secret (hex)", type: "String", attributes: { placeholder: "64 hex characters" } },
],
});
adminPage(req, res, "peers",
{ type: "card", title: "Peers", contents:
p("This instance's ", strong("env_id"), " is ", code(escape(env ? env.env_id : "?")), ". Paste this into the other instance's peer form.") + peerTable },
{ type: "card", title: "Add peer", contents: renderForm(addForm, req.csrfToken()) }
);
};
const peersAdd = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const envId = (req.body.env_id || "").trim();
const label = (req.body.label || "").trim() || null;
const baseUrl = (req.body.base_url || "").trim();
const requireTls = !!req.body.require_tls;
const provided = (req.body.existing_secret || "").trim();
let existingSecret = null;
if (provided) {
if (!/^[0-9a-fA-F]{64}$/.test(provided)) {
throw new Error("existing_secret must be 64 hex characters");
}
existingSecret = Buffer.from(provided, "hex");
}
const { peer, secret } = await peers.addPeer({ envId: envId, label: label, baseUrl: baseUrl, requireTls: requireTls, existingSecret: existingSecret });
const secretHex = secret.toString("hex");
adminPage(req, res, "peer paired",
{ type: "card", title: `Peer ${escape(peer.label || peer.env_id)} paired`, contents:
p("Copy this secret into the peer's pairing form (it will not be shown again):") +
pre({ class: "user-select-all p-2 bg-light text-dark border rounded", style: "white-space:pre-wrap;word-break:break-all" }, escape(secretHex)) +
p("If this is a brand-new pairing, give the peer this side's env_id too.") +
p(link("/admin/dev-deploy/peers", "Back to peers")) }
);
} catch (err) {
res.redirect("/admin/dev-deploy/peers?err=" + encodeURIComponent(err.message));
}
};
const peersRotate = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const peerId = parseInt(req.body.peer_id, 10);
const { peer, secret } = await peers.rotatePeerSecret(peerId);
adminPage(req, res, "secret rotated",
{ type: "card", title: `Peer ${escape(peer.label || peer.env_id)} secret rotated`, contents:
p("New secret (shown once):") +
pre({ class: "user-select-all p-2 bg-light text-dark border rounded", style: "white-space:pre-wrap;word-break:break-all" }, escape(secret.toString("hex"))) +
p("Paste this on the other side via Rotate or by re-pairing.") +
p(link("/admin/dev-deploy/peers", "Back to peers")) }
);
} catch (err) {
res.redirect("/admin/dev-deploy/peers?err=" + encodeURIComponent(err.message));
}
};
const peersDelete = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const peerId = parseInt(req.body.peer_id, 10);
await peers.deletePeer(peerId);
res.redirect("/admin/dev-deploy/peers?msg=" + encodeURIComponent("peer deleted"));
} catch (err) {
res.redirect("/admin/dev-deploy/peers?err=" + encodeURIComponent(err.message));
}
};
// ---------------- Plan + promote ----------------
const planView = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
const peerIdRaw = req.query.peer;
const peerList = await peers.listPeers();
if (!peerIdRaw) {
const selForm = new Form({
methodGET: true,
action: "/admin/dev-deploy/plan",
submitLabel: "Show plan",
fields: [
{ name: "peer", label: "Peer", input_type: "select",
options: peerList.map((peer) => ({ label: peer.label || peer.env_id, value: String(peer.peer_id) })) },
],
});
adminPage(req, res, "plan",
{ type: "card", title: "Plan", contents:
peerList.length === 0 ? p({ class: "text-muted" }, "No peers configured.") : renderForm(selForm, req.csrfToken()) }
);
return;
}
const peerId = parseInt(peerIdRaw, 10);
const peer = await peers.findPeer(peerId);
if (!peer) { res.status(404).send("peer not found"); return; }
const limit = Math.min(parseInt(req.query.limit || "100", 10) || 100, 500);
const offset = Math.max(parseInt(req.query.offset || "0", 10) || 0, 0);
const env = await getEnv();
// Anchor: the last_op_id we sent outbound to this peer (or epoch)
const anchor = await db.selectMaybeOne("_dd_anchors", { peer_id: peerId, direction: "outbound" });
const schema = db.getTenantSchemaPrefix();
let sql = `SELECT op_id, op_type, entity_kind, entity_uuid, payload, parent_op_id, status, created_at FROM ${schema}_dd_ops WHERE source_env_id = $1`;
const params = [env.env_id];
if (anchor) {
const anchorRow = await db.selectMaybeOne("_dd_ops", { op_id: anchor.last_op_id });
if (anchorRow) {
sql += ` AND created_at > $${params.length + 1}`;
params.push(anchorRow.created_at);
}
}
sql += ` ORDER BY created_at ASC LIMIT $${params.length + 1}`;
params.push(limit);
sql += ` OFFSET $${params.length + 1}`;
params.push(offset);
const planRows = (await db.query(sql, params)).rows;
const planTable = planRows.length === 0
? p({ class: "text-muted" }, "No new ops to send")
: mkTable([
{ label: "op", key: (o) => code(escape(o.op_id.slice(0, 8))) },
{ label: "op_type", key: (o) => escape(o.op_type) },
{ label: "entity", key: (o) => code(escape((o.entity_uuid || "").slice(0, 8))) },
{ label: "status", key: (o) => escape(o.status) },
{ label: "created", key: (o) => escape(o.created_at) },
], planRows);
const promoteBtn = planRows.length === 0 ? "" :
post_btn("/admin/dev-deploy/promote", `Promote ${escape(planRows.length)} op${planRows.length === 1 ? "" : "s"} to ${escape(peer.label || peer.env_id)}`, req.csrfToken(), { req, btnClass: "btn-primary", body: { peer_id: peerId } });
adminPage(req, res, "plan",
{ type: "card", title: `Plan: promote to ${escape(peer.label || peer.env_id)}`, contents:
p("Anchor: ", anchor ? code(escape(anchor.last_op_id.slice(0, 8))) : span({ class: "text-muted" }, "(none - will send from epoch)")) +
p(`Ops that would be sent: ${escape(planRows.length)}`) +
planTable +
promoteBtn }
);
};
const promote = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const peerId = parseInt(req.body.peer_id, 10);
const peer = await peers.findPeer(peerId);
if (!peer) throw new Error(`peer ${peerId} not found`);
const env = await getEnv();
const anchor = await getOutboundAnchor(peerId);
const schema = db.getTenantSchemaPrefix();
let sql = `SELECT op_id, source_env_id, op_type, entity_kind, entity_uuid, payload, parent_op_id, correlation_id, schema_version, created_at, status FROM ${schema}_dd_ops WHERE source_env_id = $1`;
const params = [env.env_id];
if (anchor) {
const anchorRow = await db.selectMaybeOne("_dd_ops", { op_id: anchor.last_op_id });
if (anchorRow) {
sql += ` AND created_at > $${params.length + 1}`;
params.push(anchorRow.created_at);
}
}
sql += ` ORDER BY created_at ASC LIMIT 500`;
const ops = (await db.query(sql, params)).rows;
if (ops.length === 0) {
res.redirect("/admin/dev-deploy/peers?msg=" + encodeURIComponent("no ops to promote"));
return;
}
const secret = await peers.peerSecret(peerId);
const r = await signedFetch({
baseUrl: peer.base_url,
method: "POST",
path: "/dev-deploy/api/ingest",
body: { ops: ops },
sourceEnvId: env.env_id,
secret: secret,
requireTls: peer.require_tls
});
if (!r.ok) {
throw new Error(`peer responded ${r.status}: ${JSON.stringify(r.body)}`);
}
await upsertAnchor(peerId, "outbound", ops[ops.length - 1].op_id);
const applied = (r.body && r.body.results || []).filter((x) => x.status === "applied").length;
const errors = (r.body && r.body.results || []).filter((x) => x.status === "error").length;
let msg = `promoted ${ops.length} ops (${applied} applied, ${errors} errors)`;
const localPlugins = (await db.query(`SELECT name, source, version FROM _sc_plugins ORDER BY name`)).rows;
const warnings = await diffPluginsWithPeer(peer, env, localPlugins);
if (warnings.length > 0) {
msg += " | WARNINGS: " + warnings.join("; ");
}
res.redirect("/admin/dev-deploy/peers?msg=" + encodeURIComponent(msg));
} catch (err) {
res.redirect("/admin/dev-deploy/peers?err=" + encodeURIComponent(err.message));
}
};
const pull = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const peerId = parseInt(req.body.peer_id, 10);
const peer = await peers.findPeer(peerId);
if (!peer) throw new Error(`peer ${peerId} not found`);
const env = await getEnv();
const secret = await peers.peerSecret(peerId);
// apiJournal caps each response at 1000 ops, so loop: fetch from the
// advancing inbound anchor, apply, advance the anchor, repeat until a
// fetch returns 0 ops (drained) or we hit the safety cap.
let total = 0;
let applied = 0;
let errors = 0;
let conflicts = 0;
let iters = 0;
let cappedOut = false;
for (;;) {
if (iters >= PULL_MAX_ITERS) {
cappedOut = true;
break;
}
iters += 1;
const anchor = await getInboundAnchor(peerId);
const since = anchor ? anchor.last_op_id : null;
const path = since ? `/dev-deploy/api/journal?since=${encodeURIComponent(since)}` : "/dev-deploy/api/journal";
const r = await signedFetch({
baseUrl: peer.base_url,
method: "GET",
path: path,
body: null,
sourceEnvId: env.env_id,
secret: secret,
requireTls: peer.require_tls
});
if (!r.ok) {
throw new Error(`peer responded ${r.status}: ${JSON.stringify(r.body)}`);
}
const ops = (r.body && r.body.ops) || [];
if (ops.length === 0) {
break;
}
const results = await applyBatch(ops, { peerId: peerId, myEnvId: env.env_id });
applied += results.filter((x) => x.status === "applied").length;
errors += results.filter((x) => x.status === "error").length;
conflicts += results.filter((x) => x.status === "conflict").length;
total += ops.length;
await upsertAnchor(peerId, "inbound", ops[ops.length - 1].op_id);
}
if (total === 0) {
res.redirect("/admin/dev-deploy/peers?msg=" + encodeURIComponent("nothing to pull"));
return;
}
let sum = `pulled ${total} ops (${applied} applied, ${errors} errors, ${conflicts} conflicts)`;
if (cappedOut) {
sum += " (stopped at safety cap; pull again)";
}
const localPlugins = (await db.query(`SELECT name, source, version FROM _sc_plugins ORDER BY name`)).rows;
const warnings = await diffPluginsWithPeer(peer, env, localPlugins);
if (warnings.length > 0) {
sum += " | WARNINGS: " + warnings.join("; ");
}
const dest = conflicts > 0 ? "/admin/dev-deploy/conflicts?msg=" : "/admin/dev-deploy/peers?msg=";
res.redirect(dest + encodeURIComponent(sum));
} catch (err) {
res.redirect("/admin/dev-deploy/peers?err=" + encodeURIComponent(err.message));
}
};
// ---------------- Conflicts ----------------
const conflictsView = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
const schema = db.getTenantSchemaPrefix();
const conflicts = (await db.query(`
SELECT i.op_id AS i_op_id,
i.source_env_id AS i_source,
i.op_type AS i_op_type,
i.entity_kind AS i_kind,
i.entity_uuid AS i_uuid,
i.payload AS i_payload,
i.created_at AS i_created,
l.op_id AS l_op_id,
l.op_type AS l_op_type,
l.payload AS l_payload,
l.applied_at AS l_applied
FROM ${schema}_dd_ops i
LEFT JOIN ${schema}_dd_ops l ON l.op_id = i.conflict_with_op_id
WHERE i.status = 'conflict'
ORDER BY i.created_at ASC
`)).rows;
const isMergeable = (c) =>
c.i_op_type && c.l_op_type &&
c.i_op_type.startsWith("update_") &&
c.l_op_type.startsWith("update_") &&
c.i_op_type === c.l_op_type;
const conflictsTable = conflicts.length === 0
? p({ class: "text-muted" }, "No pending conflicts")
: mkTable([
{ label: "Theirs", key: (c) =>
strong("incoming") + " " + code(escape(c.i_op_id.slice(0, 8))) + " from " + code(escape((c.i_source || "").slice(0, 8))) + "<br>" +
escape(c.i_op_type) + " on entity " + code(escape((c.i_uuid || "").slice(0, 8))) + "<br>" +
span({ class: "text-muted" }, "created " + escape(c.i_created)) +
pre({ class: "mb-0 p-1 bg-light text-dark", style: "white-space:pre-wrap;word-break:break-all" }, escape(c.i_payload)) },
{ label: "Mine", key: (c) => c.l_op_id
? strong("local") + " " + code(escape(c.l_op_id.slice(0, 8))) + "<br>" + escape(c.l_op_type) + "<br>" +
span({ class: "text-muted" }, "applied " + escape(c.l_applied || "")) +
pre({ class: "mb-0 p-1 bg-light text-dark", style: "white-space:pre-wrap;word-break:break-all" }, escape(c.l_payload))
: span({ class: "text-muted" }, "(no local op recorded)") },
{ label: "Action", key: (c) =>
(isMergeable(c) ? link("/admin/dev-deploy/conflicts/merge?op_id=" + encodeURIComponent(c.i_op_id), "Merge per field") + "<br><br>" : "") +
post_btn("/admin/dev-deploy/conflicts/resolve", "Use theirs", req.csrfToken(), { req, btnClass: "btn-primary", small: true, formClass: "d-inline me-1", body: { op_id: c.i_op_id, action: "theirs" } }) +
post_btn("/admin/dev-deploy/conflicts/resolve", "Use mine", req.csrfToken(), { req, btnClass: "btn-secondary", small: true, formClass: "d-inline", body: { op_id: c.i_op_id, action: "mine" } }) },
], conflicts);
const intro = `<p>A conflict means an incoming op and a local op both touched the same entity since the last sync. The incoming op was NOT applied; pick which version wins.</p>
<ul>
<li><strong>Use theirs</strong>: applies the incoming op now (overwrites local change). The local op stays in the journal but its effect is overwritten.</li>
<li><strong>Use mine</strong>: marks the incoming op as <code>rejected</code>. The local state stands. The peer may re-send the op on future syncs; subsequent pulls will skip it via idempotency.</li>
</ul>`;
adminPage(req, res, "conflicts",
{ type: "card", title: "Pending conflicts", contents: intro + conflictsTable }
);
};
const conflictsResolve = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const opId = (req.body.op_id || "").trim();
const action = (req.body.action || "").trim();
if (!opId) throw new Error("op_id required");
if (!["theirs", "mine"].includes(action)) throw new Error("action must be 'theirs' or 'mine'");
// For file ops, "use theirs" re-applies create_file which fetches bytes
// from the originating peer -- resolve the peer from the op's source env.
const op = await db.selectMaybeOne("_dd_ops", { op_id: opId });
const peer = op ? await peers.findPeerByEnvId(op.source_env_id) : null;
const env = await getEnv();
const opts = {
peerId: peer ? peer.peer_id : null,
myEnvId: env ? env.env_id : null
};
const r = await resolveConflict(opId, action, opts);
res.redirect("/admin/dev-deploy/conflicts?msg=" + encodeURIComponent(`resolved ${opId.slice(0, 8)} with action=${action}: ${JSON.stringify(r)}`));
} catch (err) {
res.redirect("/admin/dev-deploy/conflicts?err=" + encodeURIComponent(err.message));
}
};
const renderValue = (v) => {
if (v === null || v === undefined) {
return '<span class="text-muted">(unset)</span>';
}
if (typeof v === "object") {
return `<pre class="mb-0 p-1 bg-light text-dark" style="white-space:pre-wrap;word-break:break-all">${escape(JSON.stringify(v, null, 2))}</pre>`;
}
return escape(String(v));
};
const conflictsMergeView = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
const opId = (req.query.op_id || "").trim();
if (!opId) { res.redirect("/admin/dev-deploy/conflicts?err=op_id+required"); return; }
try {
const op = await db.selectMaybeOne("_dd_ops", { op_id: opId });
if (!op) throw new Error(`op ${opId} not found`);
if (op.status !== "conflict") throw new Error(`op ${opId} is not in conflict status`);
const diff = await conflictFieldDiff(op);
const diffs = diff.diffs || [];
const ent = diff.instance
? `<p>Entity: <code>${escape(diff.kind)}</code> <strong>${escape(diff.instance.name || diff.instance.role || diff.instance.id)}</strong> (local id ${escape(diff.instance.id)})</p>`
: `<p class="alert alert-danger">${escape(diff.reason || "no entity diff available")}</p>`;
let formBody;
if (diffs.length === 0) {
formBody = `<p class="text-muted">No field-level differences detected (current state already matches the incoming op's patch on every field). Applying the merge will just mark this conflict as resolved.</p>`;
} else {
const rows = diffs.map((d) => `<tr>
<td><code>${escape(d.field)}</code></td>
<td>${renderValue(d.currentValue)}</td>
<td>${renderValue(d.incomingValue)}</td>
<td>
<label><input type="radio" class="form-check-input me-1" name="choice_${escape(d.field)}" value="current" checked> keep current</label><br>
<label><input type="radio" class="form-check-input me-1" name="choice_${escape(d.field)}" value="incoming"> take incoming</label><br>
<label><input type="radio" class="form-check-input me-1" name="choice_${escape(d.field)}" value="custom"> custom:
<input type="text" class="form-control form-control-sm d-inline-block w-auto" name="custom_${escape(d.field)}" value="${escape(typeof d.currentValue === "string" ? d.currentValue : "")}">
</label>
</td>
</tr>`).join("");
formBody = `
<table class="table table-sm table-bordered">
<tr><th>field</th><th>current (mine)</th><th>incoming (theirs)</th><th>resolution</th></tr>
${rows}
</table>
<p>Defaults to <strong>keep current</strong> for every field — submit as-is for a no-op resolution that just clears the conflict marker.</p>
`;
}
const contents = `${ent}
<p>Incoming op <code>${escape(op.op_id.slice(0, 8))}</code> from <code>${escape((op.source_env_id || "").slice(0, 8))}</code>; ${escape(op.op_type)}.</p>
<form method="post" action="/admin/dev-deploy/conflicts/merge/apply">
${csrfField(req)}
<input type="hidden" name="op_id" value="${escape(op.op_id)}">
${formBody}
<p>
<button type="submit" class="btn btn-primary">Apply merge</button>
<a class="btn btn-link" href="/admin/dev-deploy/conflicts">Cancel</a>
</p>
</form>`;
adminPage(req, res, "merge", { type: "card", title: "Merge conflict per field", contents: contents });
} catch (err) {
res.redirect("/admin/dev-deploy/conflicts?err=" + encodeURIComponent(err.message));
}
};
// Parse number/boolean/null literal strings from a text input. JSON.parse first
// (handles "true", "42", "null"); fall back to the raw string.
const coerce = (s) => {
if (s === undefined || s === null) return s;
try {
const parsed = JSON.parse(s);
if (parsed === null || ["boolean", "number"].includes(typeof parsed)) return parsed;
return s;
} catch (e) {
return s;
}
};
const conflictsMergeApply = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const opId = (req.body.op_id || "").trim();
if (!opId) throw new Error("op_id required");
const op = await db.selectMaybeOne("_dd_ops", { op_id: opId });
if (!op) throw new Error(`op ${opId} not found`);
if (op.status !== "conflict") throw new Error(`op ${opId} is not in conflict status`);
const payload = typeof op.payload === "string" ? JSON.parse(op.payload) : (op.payload || {});
const incomingPatch = payload.patch || {};
// For each "choice_<field>" entry in the form body, decide what value
// to write -- if any. "current" means don't touch the field.
const choices = {};
for (const [k, v] of Object.entries(req.body || {})) {
if (!k.startsWith("choice_")) continue;
const field = k.substring("choice_".length);
if (v === "incoming") {
choices[field] = incomingPatch[field];
} else if (v === "custom") {
const customVal = req.body[`custom_${field}`];
choices[field] = coerce(customVal);
}
}
const r = await resolveConflictByMerge(opId, choices);
res.redirect("/admin/dev-deploy/conflicts?msg=" + encodeURIComponent(`merged ${opId.slice(0, 8)}: ${JSON.stringify(r)}`));
} catch (err) {
res.redirect("/admin/dev-deploy/conflicts?err=" + encodeURIComponent(err.message));
}
};
// ---------------- Tables (data_mode) ----------------
const tablesView = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
const schema = db.getTenantSchemaPrefix();
const rows = (await db.query(`
SELECT e.uuid, e.current_name, e.current_id,
COALESCE(m.data_mode, 'user') AS data_mode,
m.updated_at,
m.starter_shipped_at
FROM ${schema}_dd_entity_ids e
LEFT JOIN ${schema}_dd_table_modes m ON m.table_uuid = e.uuid
WHERE e.kind = 'table'
ORDER BY e.current_name
`)).rows;
const lockedNames = new Set(["users"]);
const modeOpts = [DATA_MODES.USER, DATA_MODES.STARTER, DATA_MODES.MANAGED];
const tablesTable = rows.length === 0
? p({ class: "text-muted" }, "No tables tracked yet")
: mkTable([
{ label: "table", key: (r) => escape(r.current_name) },
{ label: "uuid", key: (r) => code(escape(r.uuid.slice(0, 8))) },
{ label: "local id", key: (r) => escape(r.current_id) },
{ label: "data_mode", key: (r) => {
const shipped = r.starter_shipped_at ? "<br>" + span({ class: "text-muted" }, "shipped " + escape(r.starter_shipped_at)) : "";
if (lockedNames.has(r.current_name)) return span({ class: "badge bg-secondary" }, "user (locked)") + shipped;
const opts = modeOpts.map((m) => `<option value="${m}"${m === r.data_mode ? " selected" : ""}>${m}</option>`).join("");
return `<form class="d-inline" method="post" action="/admin/dev-deploy/tables/set" onsubmit="return confirm('Switching to managed/starter ADDS a hidden _dd_row_uuid column to this table and ships the current rows. Existing rows on target instances may be overwritten on next promote. Continue?');">${csrfField(req)}<input type="hidden" name="table_uuid" value="${escape(r.uuid)}"><select class="form-select form-select-sm d-inline-block w-auto me-1" name="data_mode">${opts}</select><button type="submit" class="btn btn-primary btn-sm">Set</button></form>` + shipped;
} },
{ label: "updated_at", key: (r) => escape(r.updated_at || "-") },
], rows);
const intro = `<p>Controls how each table's row content propagates between environments. Choose carefully - switching from <strong>user</strong> to <strong>managed</strong> or <strong>starter</strong> rewrites the table's schema (adds a hidden <code>_dd_row_uuid</code> column) and ships existing rows.</p>
<ul>
<li><strong>user</strong> (default) &mdash; rows belong to the local environment; deploys never touch them. The only safe choice for end-user-entered data.</li>
<li><strong>starter</strong> &mdash; rows ship to target on first install, then the target owns them; future changes on this side don't propagate. Good for default user roles, sample categories, template data the user expects to customize.</li>
<li><strong>managed</strong> &mdash; rows always sync from source. Source is canonical; target's edits get overwritten or surface as conflicts. Good for catalogs, lookup tables, anything dev-curated.</li>
</ul>
<p>The Saltcorn <code>users</code> table is locked to <strong>user</strong> and cannot be changed.</p>`;
adminPage(req, res, "tables",
{ type: "card", title: "Tables - data mode", contents: intro + tablesTable }
);
};
const tablesSet = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const tableUuid = (req.body.table_uuid || "").trim();
const dataMode = (req.body.data_mode || "").trim();
if (!tableUuid) throw new Error("table_uuid required");
const allowed = new Set(Object.values(DATA_MODES));
if (!allowed.has(dataMode)) throw new Error(`data_mode must be one of ${[...allowed].join(", ")}`);
const ent = await db.selectMaybeOne("_dd_entity_ids", { uuid: tableUuid });
if (!ent || ent.kind !== "table") throw new Error("table not found");
if (ent.current_name === "users") throw new Error("the users table is locked to data_mode=user");
const { ensureManagedSchema, dropManagedSchema, allRowsWithUuid, setRowUuid, newRowUuid, COLUMN_NAME: ROW_UUID_COL } = require("./rowIdentity");
const { rowToPortable, markStarterShipped } = require("./rowPayload");
const Table = require("@saltcorn/data/models/table");
const { randomUuid } = require("./ids");
const { enterOp } = require("./context");
const { recordOpSafely } = require("./ops");
const { refreshState } = require("./state");
await refreshState();
const prior = await db.selectMaybeOne("_dd_table_modes", { table_uuid: tableUuid });
const now = new Date().toISOString();
// Upsert mode row first.
if (prior) {
await db.updateWhere("_dd_table_modes", { data_mode: dataMode, updated_at: now, starter_shipped_at: null }, { table_uuid: tableUuid });
} else {
await db.insert("_dd_table_modes", { table_uuid: tableUuid, data_mode: dataMode, updated_at: now, starter_shipped_at: null }, { noid: true });
}
// Journal set_table_mode FIRST so target's apply sees mode change before any row ops.
{
const opId = randomUuid();
await enterOp(opId, async () => {
await recordOpSafely({
op_id: opId,
op_type: "set_table_mode",
entity_kind: "table_mode",
entity_uuid: tableUuid,
payload: { table_uuid: tableUuid, data_mode: dataMode, before_mode: (prior && prior.data_mode) || DATA_MODES.USER }
});
});
}
let initialShipped = 0;
if (dataMode === DATA_MODES.MANAGED || dataMode === DATA_MODES.STARTER) {
// Make sure THIS instance has the hidden column + UUIDs on existing rows.
await ensureManagedSchema(ent.current_name);
// Initial ship: journal an insert_row op for every existing row.
const table = Table.findOne({ id: ent.current_id });
if (table) {
const rows = await allRowsWithUuid(ent.current_name);
for (const row of rows) {
let rowUuid = row[ROW_UUID_COL];
if (!rowUuid) {
rowUuid = newRowUuid();
await setRowUuid(ent.current_name, row.id, rowUuid);
}
const { portable } = await rowToPortable(row, table);
const opId = randomUuid();
await enterOp(opId, async () => {
await recordOpSafely({
op_id: opId,
op_type: "insert_row",
entity_kind: "table_row",
entity_uuid: rowUuid,
payload: { table_uuid: tableUuid, after: portable }
});
});
initialShipped++;
}
}
// For starter: lock out further row ops.
if (dataMode === DATA_MODES.STARTER) {
await markStarterShipped(tableUuid);
}
} else if (prior && (prior.data_mode === DATA_MODES.MANAGED || prior.data_mode === DATA_MODES.STARTER)) {
// Reverting to user — drop the hidden column for cleanliness. Best-effort.
try {
await dropManagedSchema(ent.current_name);
} catch (e) {
// ignore on older SQLite that doesn't support DROP COLUMN
}
}
const summary = initialShipped > 0
? `set ${ent.current_name} to ${dataMode}; shipped ${initialShipped} rows`
: `set ${ent.current_name} to ${dataMode}`;
res.redirect("/admin/dev-deploy/tables?msg=" + encodeURIComponent(summary));
} catch (err) {
res.redirect("/admin/dev-deploy/tables?err=" + encodeURIComponent(err.message));
}
};
// ---------------- Revert ----------------
const revertView = async (req, res) => {
if (!isAdmin(req)) { res.status(403).type("text/plain").send("admin only"); return; }
try {
const opId = (req.body.op_id || "").trim();
if (!opId) throw new Error("op_id required");
const result = await revertOp(opId);
res.redirect("/admin/dev-deploy/ops?msg=" + encodeURIComponent(`reverted op ${opId.slice(0, 8)}: ${JSON.stringify(result)}`));
} catch (err) {
res.redirect("/admin/dev-deploy/ops?err=" + encodeURIComponent(err.message));
}
};
// ---------------- Machine endpoints ----------------
const apiJournal = async (req, res) => {
const peer = await requirePeerAuth(req, res);
if (!peer) return;
const since = req.query.since;
const env = await getEnv();
const schema = db.getTenantSchemaPrefix();
let sql = `SELECT op_id, source_env_id, op_type, entity_kind, entity_uuid, payload, parent_op_id, correlation_id, schema_version, created_at, status FROM ${schema}_dd_ops WHERE source_env_id = $1`;
const params = [env.env_id];
if (since) {
const anchorRow = await db.selectMaybeOne("_dd_ops", { op_id: since });
if (anchorRow) {
sql += ` AND created_at > $${params.length + 1}`;
params.push(anchorRow.created_at);
}
}
sql += ` ORDER BY created_at ASC LIMIT 1000`;
const ops = (await db.query(sql, params)).rows;
res.json({ source_env_id: env.env_id, ops: ops });
};
const apiHealth = async (req, res) => {
const peer = await requirePeerAuth(req, res);
if (!peer) return;
const env = await getEnv();
const plugins = (await db.query(`SELECT name, source, version FROM _sc_plugins ORDER BY name`)).rows;
res.json({
env_id: env.env_id,
label: env.env_label,
plugins: plugins
});
};
// Compare local plugin list with peer's. Returns array of human-readable
// warning strings (empty if all match). Best-effort: if the peer's health
// endpoint is unreachable or returns non-200, returns a single "couldn't
// reach peer's health endpoint" warning and lets the caller proceed.
const diffPluginsWithPeer = async (peerRow, env, localPlugins) => {
let r;
try {
const secret = await peers.peerSecret(peerRow.peer_id);
r = await signedFetch({
baseUrl: peerRow.base_url,
method: "GET",
path: "/dev-deploy/api/health",
body: null,
sourceEnvId: env.env_id,
secret: secret,
requireTls: peerRow.require_tls
});
} catch (e) {
return [`could not check peer plugin list: ${e.message}`];
}
if (!r.ok || !r.body || !Array.isArray(r.body.plugins)) {
return [`peer's health endpoint returned ${r.status}`];
}
const localByName = new Map(localPlugins.map((p) => [p.name, p]));
const peerByName = new Map(r.body.plugins.map((p) => [p.name, p]));
const warnings = [];
for (const [name, mine] of localByName) {
const theirs = peerByName.get(name);
if (!theirs) {
warnings.push(`peer missing plugin "${name}"`);
} else if ((mine.version || "") !== (theirs.version || "")) {
warnings.push(`plugin version mismatch on "${name}": local ${mine.version || "?"}, peer ${theirs.version || "?"}`);
}
}
for (const [name, theirs] of peerByName) {
if (!localByName.has(name)) {
warnings.push(`peer has plugin not installed here: "${name}"`);
}
}
return warnings;
};
const apiFile = async (req, res) => {
try {
const peer = await requirePeerAuth(req, res);
if (!peer) return;
const uuid = req.params.uuid;
if (!uuid) {
res.status(400).json({ error: "uuid required" });
return;
}
const mapping = await db.selectMaybeOne("_dd_entity_ids", { uuid: uuid, kind: "file" });
if (!mapping) {
res.status(404).json({ error: "file not found", uuid: uuid });
return;
}
const path = require("path");
const dbMod = require("@saltcorn/data/db");
const absPath = path.join(dbMod.connectObj.file_store, dbMod.getTenantSchema(), mapping.current_name);
res.type("application/octet-stream");
// dotfiles: 'allow' so paths containing .dev-state (etc.) aren't
// silently treated as not-found by Express's default dotfile policy.
res.sendFile(absPath, { dotfiles: "allow" }, (err) => {
if (err && !res.headersSent) {
// eslint-disable-next-line no-console
console.error(`[dev-deploy] sendFile failed for ${absPath}:`, err.message);
res.status(500).json({ error: "failed to read file: " + err.message, path: absPath });
}
});
} catch (err) {
// eslint-disable-next-line no-console
console.error(`[dev-deploy] apiFile crashed:`, err && err.stack ? err.stack : err);
if (!res.headersSent) {
res.status(500).json({ error: err.message });
}
}
};
const apiIngest = async (req, res) => {
const peer = await requirePeerAuth(req, res);
if (!peer) return;
const ops = (req.body && req.body.ops) || [];
if (!Array.isArray(ops)) {
res.status(400).json({ error: "ops must be an array" });
return;
}
const env = await getEnv();
const results = await applyBatch(ops, { peerId: peer.peer_id, myEnvId: env.env_id });
// Advance inbound anchor to the last op_id from the source side
if (ops.length > 0) {
const lastOp = ops[ops.length - 1];
const now = new Date().toISOString();
const existing = await db.selectMaybeOne("_dd_anchors", { peer_id: peer.peer_id, direction: "inbound" });
if (existing) {
await db.updateWhere("_dd_anchors", { last_op_id: lastOp.op_id, updated_at: now }, { peer_id: peer.peer_id, direction: "inbound" });
} else {
await db.insert("_dd_anchors", { peer_id: peer.peer_id, direction: "inbound", last_op_id: lastOp.op_id, updated_at: now }, { noid: true });
}
}
res.json({ received: ops.length, results: results });
};
// ---------------- Route registration ----------------
const routes = [
{ url: "/admin/dev-deploy/", method: "get", callback: dashboard },
{ url: "/admin/dev-deploy/ops", method: "get", callback: opsView },
{ url: "/admin/dev-deploy/peers", method: "get", callback: peersView },
{ url: "/admin/dev-deploy/peers/add", method: "post", callback: peersAdd },
{ url: "/admin/dev-deploy/peers/rotate", method: "post", callback: peersRotate },
{ url: "/admin/dev-deploy/peers/delete", method: "post", callback: peersDelete },
{ url: "/admin/dev-deploy/plan", method: "get", callback: planView },
{ url: "/admin/dev-deploy/promote", method: "post", callback: promote },
{ url: "/admin/dev-deploy/pull", method: "post", callback: pull },
{ url: "/admin/dev-deploy/revert", method: "post", callback: revertView },
{ url: "/admin/dev-deploy/tables", method: "get", callback: tablesView },
{ url: "/admin/dev-deploy/tables/set", method: "post", callback: tablesSet },
{ url: "/admin/dev-deploy/conflicts", method: "get", callback: conflictsView },
{ url: "/admin/dev-deploy/conflicts/resolve", method: "post", callback: conflictsResolve },
{ url: "/admin/dev-deploy/conflicts/merge", method: "get", callback: conflictsMergeView },
{ url: "/admin/dev-deploy/conflicts/merge/apply", method: "post", callback: conflictsMergeApply },
{ url: "/dev-deploy/api/journal", method: "get", callback: apiJournal, noCsrf: true },
{ url: "/dev-deploy/api/ingest", method: "post", callback: apiIngest, noCsrf: true },
{ url: "/dev-deploy/api/file/:uuid", method: "get", callback: apiFile, noCsrf: true },
{ url: "/dev-deploy/api/health", method: "get", callback: apiHealth, noCsrf: true }
];
module.exports = {
routes
};