dev-deploy/dev-deploy/lib/apply.js
2026-05-17 17:31:49 -05:00

1135 lines
44 KiB
JavaScript

// Apply handlers — replay an op from a peer onto this instance.
//
// Each handler:
// - Resolves the op's entity_uuid (and any parent_uuid) to this instance's
// local integer id via _dd_entity_ids.
// - Invokes the Saltcorn model method to reproduce the change.
// - Updates _dd_entity_ids (insert / rename / remove) accordingly.
//
// While a handler runs, the CRUD wraps are suppressed (no journal writes, no
// auto-UUID assignment) — we manage those side effects ourselves so the
// foreign op_id and entity_uuid are preserved across instances.
const db = require("@saltcorn/data/db");
const Table = require("@saltcorn/data/models/table");
const Field = require("@saltcorn/data/models/field");
const View = require("@saltcorn/data/models/view");
const Page = require("@saltcorn/data/models/page");
const Trigger = require("@saltcorn/data/models/trigger");
const Role = require("@saltcorn/data/models/role");
const Library = require("@saltcorn/data/models/library");
const Tag = require("@saltcorn/data/models/tag");
const TableConstraint = require("@saltcorn/data/models/table_constraints");
const File = require("@saltcorn/data/models/file");
const PageGroup = require("@saltcorn/data/models/page_group");
const PageGroupMember = require("@saltcorn/data/models/page_group_member");
const WorkflowStep = require("@saltcorn/data/models/workflow_step");
const {
lookupByUuid,
adoptUuid,
updateName,
removeEntityRow,
lookupByCurrent,
constraintDisplayName
} = require("./entityIds");
const { runSuppressed } = require("./context");
const { ENTITY_KINDS, fileLocationToId } = require("./constants");
const { refreshState } = require("./state");
const { sha256Buffer, writeFileBytes, toAbsolutePath } = require("./files");
const { signedFetchBinary } = require("./transport");
const peers = require("./peers");
const { getEnv } = require("./env");
const { fromPlaceholders } = require("./payloadRefs");
const {
ensureManagedSchema,
setRowUuid,
findIdByRowUuid,
COLUMN_NAME: ROW_UUID_COL
} = require("./rowIdentity");
const { portableToRow } = require("./rowPayload");
// Strip surrogate keys that don't translate across instances. Returns a new
// object with all non-id properties.
const stripSurrogateKeys = (obj, extraIdKeys) => {
const drop = new Set(["id", "table_id", "view_id", "page_id", "role_id_for_create", ...(extraIdKeys || [])]);
const out = {};
for (const k of Object.keys(obj || {})) {
if (drop.has(k)) continue;
out[k] = obj[k];
}
return out;
};
const requireLocalEntity = async (uuid, kind) => {
const m = await lookupByUuid(uuid);
if (!m) {
throw new Error(`local entity for uuid=${uuid} (kind=${kind}) not found`);
}
return m;
};
// ---------------- Tables ----------------
const applyCreateTable = async ({ op, payload }) => {
const after = payload.after || {};
if (!after.name) {
throw new Error("create_table missing after.name");
}
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const opts = stripSurrogateKeys(after);
const tableName = opts.name;
delete opts.name;
const t = await Table.create(tableName, opts);
await adoptUuid(op.entity_uuid, ENTITY_KINDS.TABLE, t.id, t.name, null);
return { status: "created", local_id: t.id };
};
const applyUpdateTable = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.TABLE);
const t = Table.findOne({ id: mapping.current_id });
if (!t) {
throw new Error(`table id=${mapping.current_id} not found`);
}
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await t.update(patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName(ENTITY_KINDS.TABLE, mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropTable = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop", reason: "already dropped" };
}
const t = Table.findOne({ id: mapping.current_id });
if (!t) {
await removeEntityRow(ENTITY_KINDS.TABLE, mapping.current_id);
return { status: "noop", reason: "table not in saltcorn" };
}
await t.delete();
await removeEntityRow(ENTITY_KINDS.TABLE, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Fields ----------------
const applyCreateField = async ({ op, payload }) => {
const after = payload.after || {};
const parentUuid = payload.parent_uuid;
if (!parentUuid) {
throw new Error("create_field missing parent_uuid");
}
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const parent = await requireLocalEntity(parentUuid, ENTITY_KINDS.TABLE);
const table = Table.findOne({ id: parent.current_id });
if (!table) {
throw new Error(`parent table id=${parent.current_id} not found`);
}
const cfg = stripSurrogateKeys(after);
cfg.table_id = table.id;
const f = await Field.create(cfg);
await adoptUuid(op.entity_uuid, ENTITY_KINDS.FIELD, f.id, f.name, parentUuid);
return { status: "created", local_id: f.id };
};
const applyUpdateField = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.FIELD);
const f = await Field.findOne({ id: mapping.current_id });
if (!f) {
throw new Error(`field id=${mapping.current_id} not found`);
}
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await f.update(patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName(ENTITY_KINDS.FIELD, mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropField = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop", reason: "already dropped" };
}
const f = await Field.findOne({ id: mapping.current_id });
if (!f) {
await removeEntityRow(ENTITY_KINDS.FIELD, mapping.current_id);
return { status: "noop" };
}
await f.delete();
await removeEntityRow(ENTITY_KINDS.FIELD, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Views ----------------
const applyCreateView = async ({ op, payload }) => {
const after = payload.after || {};
const parentUuid = payload.parent_uuid;
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const cfg = stripSurrogateKeys(after);
if (parentUuid) {
const parent = await requireLocalEntity(parentUuid, ENTITY_KINDS.TABLE);
cfg.table_id = parent.current_id;
}
const v = await View.create(cfg);
await adoptUuid(op.entity_uuid, ENTITY_KINDS.VIEW, v.id, v.name, parentUuid || null);
return { status: "created", local_id: v.id };
};
const applyUpdateView = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.VIEW);
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await View.update(patch, mapping.current_id);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName(ENTITY_KINDS.VIEW, mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropView = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop", reason: "already dropped" };
}
const v = View.findOne({ id: mapping.current_id });
if (!v) {
await removeEntityRow(ENTITY_KINDS.VIEW, mapping.current_id);
return { status: "noop" };
}
await v.delete();
await removeEntityRow(ENTITY_KINDS.VIEW, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Pages ----------------
const applyCreatePage = async ({ op, payload }) => {
const after = payload.after || {};
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const cfg = stripSurrogateKeys(after);
const p = await Page.create(cfg);
await adoptUuid(op.entity_uuid, ENTITY_KINDS.PAGE, p.id, p.name, null);
return { status: "created", local_id: p.id };
};
const applyUpdatePage = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.PAGE);
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await Page.update(mapping.current_id, patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName(ENTITY_KINDS.PAGE, mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropPage = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop", reason: "already dropped" };
}
const p = Page.findOne({ id: mapping.current_id });
if (!p) {
await removeEntityRow(ENTITY_KINDS.PAGE, mapping.current_id);
return { status: "noop" };
}
await p.delete();
await removeEntityRow(ENTITY_KINDS.PAGE, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Triggers ----------------
const applyCreateTrigger = async ({ op, payload }) => {
const after = payload.after || {};
const parentUuid = payload.parent_uuid;
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const cfg = stripSurrogateKeys(after);
if (parentUuid) {
const parent = await requireLocalEntity(parentUuid, ENTITY_KINDS.TABLE);
cfg.table_id = parent.current_id;
}
const tr = await Trigger.create(cfg);
await adoptUuid(op.entity_uuid, ENTITY_KINDS.TRIGGER, tr.id, tr.name || `trigger_${tr.id}`, parentUuid || null);
return { status: "created", local_id: tr.id };
};
const applyUpdateTrigger = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.TRIGGER);
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await Trigger.update(mapping.current_id, patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName(ENTITY_KINDS.TRIGGER, mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropTrigger = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop", reason: "already dropped" };
}
const tr = Trigger.findOne({ id: mapping.current_id });
if (!tr) {
await removeEntityRow(ENTITY_KINDS.TRIGGER, mapping.current_id);
return { status: "noop" };
}
await tr.delete();
await removeEntityRow(ENTITY_KINDS.TRIGGER, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Roles ----------------
const applyCreateRole = async ({ op, payload }) => {
const after = payload.after || {};
if (!after.id || !after.role) {
throw new Error("create_role missing after.id or after.role");
}
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
await Role.create({ id: after.id, role: after.role });
await adoptUuid(op.entity_uuid, ENTITY_KINDS.ROLE, after.id, after.role, null);
return { status: "created", local_id: after.id };
};
const applyUpdateRole = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.ROLE);
const r = await Role.findOne({ id: mapping.current_id });
const patch = payload.patch || {};
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await r.update(patch);
if (patch.role !== undefined && patch.role !== mapping.current_name) {
await updateName(ENTITY_KINDS.ROLE, mapping.current_id, patch.role);
}
return { status: "updated" };
};
const applyDropRole = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop" };
}
const r = await Role.findOne({ id: mapping.current_id });
if (r) {
await r.delete();
}
await removeEntityRow(ENTITY_KINDS.ROLE, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Library ----------------
const applyCreateLibrary = async ({ op, payload }) => {
const after = payload.after || {};
if (!after.name) {
throw new Error("create_library missing after.name");
}
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const cfg = stripSurrogateKeys(after);
await Library.create(cfg);
const fresh = await Library.findOne({ name: after.name });
if (!fresh) {
throw new Error("library not found after create");
}
await adoptUuid(op.entity_uuid, ENTITY_KINDS.LIBRARY, fresh.id, fresh.name, null);
return { status: "created", local_id: fresh.id };
};
const applyUpdateLibrary = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.LIBRARY);
const li = await Library.findOne({ id: mapping.current_id });
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await li.update(patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName(ENTITY_KINDS.LIBRARY, mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropLibrary = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop" };
}
const li = await Library.findOne({ id: mapping.current_id });
if (li) {
await li.delete();
}
await removeEntityRow(ENTITY_KINDS.LIBRARY, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Tags ----------------
const applyCreateTag = async ({ op, payload }) => {
const after = payload.after || {};
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const cfg = stripSurrogateKeys(after);
const tg = await Tag.create(cfg);
await adoptUuid(op.entity_uuid, ENTITY_KINDS.TAG, tg.id, tg.name, null);
return { status: "created", local_id: tg.id };
};
const applyUpdateTag = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.TAG);
const tg = await Tag.findOne({ id: mapping.current_id });
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await tg.update(patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName(ENTITY_KINDS.TAG, mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropTag = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop" };
}
const tg = await Tag.findOne({ id: mapping.current_id });
if (tg) {
await tg.delete();
}
await removeEntityRow(ENTITY_KINDS.TAG, mapping.current_id);
return { status: "dropped" };
};
// ---------------- Files ----------------
const applyCreateFile = async ({ op, payload, opts }) => {
const after = payload.after || {};
const relPath = after.relative_path;
if (!relPath) throw new Error("create_file missing relative_path");
if (!after.content_hash) throw new Error("create_file missing content_hash");
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
if (!opts || !opts.peerId) {
throw new Error("file apply requires opts.peerId to fetch binary from");
}
const peer = await peers.findPeer(opts.peerId);
if (!peer) throw new Error(`peer ${opts.peerId} not found`);
const secret = await peers.peerSecret(opts.peerId);
const env = await getEnv();
const r = await signedFetchBinary({
baseUrl: peer.base_url,
method: "GET",
path: `/dev-deploy/api/file/${encodeURIComponent(op.entity_uuid)}`,
body: null,
sourceEnvId: env.env_id,
secret: secret
});
if (!r.ok) {
throw new Error(`binary fetch returned ${r.status}`);
}
const bytes = r.bytes;
const actualHash = sha256Buffer(bytes);
if (actualHash !== after.content_hash) {
throw new Error(`content hash mismatch: expected ${after.content_hash}, got ${actualHash}`);
}
const absPath = toAbsolutePath(File, db, relPath);
await writeFileBytes(absPath, bytes);
const localFile = await File.create({
filename: after.filename,
location: absPath,
uploaded_at: new Date(),
size_kb: after.size_kb,
mime_super: after.mime_super,
mime_sub: after.mime_sub,
min_role_read: after.min_role_read
});
const synthId = fileLocationToId(relPath);
await adoptUuid(op.entity_uuid, ENTITY_KINDS.FILE, synthId, relPath, null);
return { status: "created", location: absPath, bytes: bytes.length };
};
const applyDropFile = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop", reason: "already dropped" };
}
const absPath = toAbsolutePath(File, db, mapping.current_name);
const file = await File.findOne({ location: absPath });
if (file) {
await file.delete();
}
await removeEntityRow(ENTITY_KINDS.FILE, mapping.current_id);
return { status: "dropped" };
};
// ---------------- TableConstraints ----------------
const applyCreateConstraint = async ({ op, payload }) => {
const after = payload.after || {};
const parentUuid = payload.parent_uuid;
if (!parentUuid) throw new Error("create_constraint missing parent_uuid");
if (!after.type) throw new Error("create_constraint missing type");
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
const parent = await requireLocalEntity(parentUuid, "table");
const cfg = {
table_id: parent.current_id,
type: after.type,
configuration: after.configuration || {}
};
const result = await TableConstraint.create(cfg);
await adoptUuid(op.entity_uuid, "constraint", result.id, constraintDisplayName(result), parentUuid);
return { status: "created", local_id: result.id };
};
const applyDropConstraint = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) {
return { status: "noop", reason: "already dropped" };
}
const con = await TableConstraint.findOne({ id: mapping.current_id });
if (con) {
await con.delete();
}
await removeEntityRow("constraint", mapping.current_id);
return { status: "dropped" };
};
// ---------------- PageGroup ----------------
const applyCreatePageGroup = async ({ op, payload }) => {
const after = payload.after || {};
if (!after.name) throw new Error("create_page_group missing after.name");
const existing = await lookupByUuid(op.entity_uuid);
if (existing) {
return { status: "noop", reason: "uuid already mapped" };
}
// Create the group first, empty members. PageGroupMember's constructor
// demands page_group_id which doesn't exist until after the insert.
const cfg = stripSurrogateKeys(after);
cfg.members = [];
const result = await PageGroup.create(cfg);
// Now resolve each member's page_uuid to local page_id and addMember.
for (const m of payload.members || []) {
if (!m.page_uuid) {
throw new Error("page_group member missing page_uuid");
}
const local = await lookupByUuid(m.page_uuid);
if (!local) {
throw new Error(`page_group member references unmapped page_uuid=${m.page_uuid}`);
}
await result.addMember({
page_id: local.current_id,
eligible_formula: m.eligible_formula,
description: m.description,
sequence: m.sequence
});
}
await adoptUuid(op.entity_uuid, "page_group", result.id, result.name, null);
return { status: "created", local_id: result.id };
};
const applyUpdatePageGroup = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, "page_group");
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await PageGroup.update(mapping.current_id, patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName("page_group", mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropPageGroup = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) return { status: "noop" };
const pg = PageGroup.findOne({ id: mapping.current_id });
if (pg) await pg.delete();
await removeEntityRow("page_group", mapping.current_id);
return { status: "dropped" };
};
// ---------------- Table rows (managed/starter) ----------------
const findLocalTableByUuid = async (tableUuid) => {
const ent = await lookupByUuid(tableUuid);
if (!ent || ent.kind !== "table") return null;
await refreshState();
return Table.findOne({ id: ent.current_id });
};
const applyInsertRow = async ({ op, payload }) => {
if (!payload || !payload.table_uuid) throw new Error("insert_row missing table_uuid");
if (!op.entity_uuid) throw new Error("insert_row missing row_uuid (entity_uuid)");
const tbl = await findLocalTableByUuid(payload.table_uuid);
if (!tbl) throw new Error(`local table for uuid=${payload.table_uuid} not found`);
await ensureManagedSchema(tbl.name);
// Idempotency: if a row with this uuid already exists, skip
const existing = await findIdByRowUuid(tbl.name, op.entity_uuid);
if (existing) {
return { status: "noop", reason: "row uuid already present" };
}
const rowData = await portableToRow(payload.after || {}, tbl);
const newId = await tbl.insertRow(rowData);
if (!newId) throw new Error("insertRow returned no id");
await setRowUuid(tbl.name, newId, op.entity_uuid);
return { status: "inserted", local_id: newId };
};
const applyUpdateRow = async ({ op, payload }) => {
if (!payload || !payload.table_uuid) throw new Error("update_row missing table_uuid");
if (!op.entity_uuid) throw new Error("update_row missing row_uuid");
const tbl = await findLocalTableByUuid(payload.table_uuid);
if (!tbl) throw new Error(`local table for uuid=${payload.table_uuid} not found`);
await ensureManagedSchema(tbl.name);
const localId = await findIdByRowUuid(tbl.name, op.entity_uuid);
if (!localId) {
// The row doesn't exist on target yet — treat as insert
const rowData = await portableToRow(payload.patch || {}, tbl);
const newId = await tbl.insertRow(rowData);
await setRowUuid(tbl.name, newId, op.entity_uuid);
return { status: "inserted_for_update", local_id: newId };
}
const patch = await portableToRow(payload.patch || {}, tbl);
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await tbl.updateRow(patch, localId);
return { status: "updated", local_id: localId };
};
const applyDropRow = async ({ op, payload }) => {
if (!payload || !payload.table_uuid) throw new Error("drop_row missing table_uuid");
if (!op.entity_uuid) throw new Error("drop_row missing row_uuid");
const tbl = await findLocalTableByUuid(payload.table_uuid);
if (!tbl) return { status: "noop", reason: "table not present locally" };
const localId = await findIdByRowUuid(tbl.name, op.entity_uuid);
if (!localId) return { status: "noop", reason: "row uuid not present locally" };
await tbl.deleteRows({ id: localId });
return { status: "dropped", local_id: localId };
};
const applySetTableMode = async ({ op, payload }) => {
if (!payload || !payload.table_uuid) throw new Error("set_table_mode missing table_uuid");
const tbl = await findLocalTableByUuid(payload.table_uuid);
if (!tbl) throw new Error(`local table for uuid=${payload.table_uuid} not found`);
const mode = payload.data_mode || "user";
if (mode === "managed" || mode === "starter") {
await ensureManagedSchema(tbl.name);
}
const now = new Date().toISOString();
const existing = await db.selectMaybeOne("_dd_table_modes", { table_uuid: payload.table_uuid });
if (existing) {
await db.updateWhere("_dd_table_modes", { data_mode: mode, updated_at: now }, { table_uuid: payload.table_uuid });
} else {
await db.insert("_dd_table_modes", { table_uuid: payload.table_uuid, data_mode: mode, updated_at: now }, { noid: true });
}
return { status: "set", mode: mode };
};
// ---------------- Plugin configuration ----------------
const applyUpdatePluginConfig = async ({ op, payload }) => {
const Plugin = require("@saltcorn/data/models/plugin");
if (!payload || !payload.name) {
throw new Error("update_plugin_config missing payload.name");
}
if (payload.name === "dev-deploy") {
return { status: "noop", reason: "skipping our own plugin" };
}
const plugin = await Plugin.findOne({ name: payload.name });
if (!plugin) {
throw new Error(`plugin "${payload.name}" is not installed on this instance`);
}
plugin.configuration = payload.configuration || {};
await plugin.upsert();
return { status: "updated", plugin: payload.name };
};
// ---------------- Config (menu_items etc.) ----------------
const applySetConfig = async ({ op, payload }) => {
const { key, value } = payload || {};
if (!key) throw new Error("set_config missing key");
const { getState } = require("@saltcorn/data/db/state");
await getState().setConfig(key, value);
return { status: "set", key: key };
};
// ---------------- PageGroupMember ----------------
const applyCreatePageGroupMember = async ({ op, payload }) => {
const after = payload.after || {};
const parentUuid = payload.parent_uuid;
if (!parentUuid) throw new Error("create_page_group_member missing parent_uuid");
if (!after.page_uuid) throw new Error("create_page_group_member missing page_uuid");
const existing = await lookupByUuid(op.entity_uuid);
if (existing) return { status: "noop", reason: "uuid already mapped" };
const group = await requireLocalEntity(parentUuid, "page_group");
const page = await requireLocalEntity(after.page_uuid, "page");
const pg = PageGroup.findOne({ id: group.current_id });
if (!pg) throw new Error(`local page_group id=${group.current_id} not found`);
const member = await pg.addMember({
page_id: page.current_id,
eligible_formula: after.eligible_formula,
description: after.description
});
await adoptUuid(op.entity_uuid, "page_group_member", member.id, `member_${member.id}`, parentUuid);
return { status: "created", local_id: member.id };
};
const applyDropPageGroupMember = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) return { status: "noop" };
await PageGroupMember.delete(mapping.current_id);
await removeEntityRow("page_group_member", mapping.current_id);
return { status: "dropped" };
};
// ---------------- WorkflowStep ----------------
const applyCreateWorkflowStep = async ({ op, payload }) => {
const after = payload.after || {};
const parentUuid = payload.parent_uuid;
const existing = await lookupByUuid(op.entity_uuid);
if (existing) return { status: "noop", reason: "uuid already mapped" };
let parentTriggerId = null;
if (parentUuid) {
const parent = await requireLocalEntity(parentUuid, "trigger");
parentTriggerId = parent.current_id;
}
const cfg = stripSurrogateKeys(after);
if (parentTriggerId) cfg.trigger_id = parentTriggerId;
const result = await WorkflowStep.create(cfg);
// WorkflowStep.create returns the inserted integer id, not an instance.
const id = typeof result === "number" ? result : (result && result.id);
if (!id) throw new Error("WorkflowStep.create returned no id");
await adoptUuid(op.entity_uuid, "workflow_step", id, after.name || `step_${id}`, parentUuid || null);
return { status: "created", local_id: id };
};
const applyUpdateWorkflowStep = async ({ op, payload }) => {
const mapping = await requireLocalEntity(op.entity_uuid, "workflow_step");
const ws = await WorkflowStep.findOne({ id: mapping.current_id });
if (!ws) throw new Error(`workflow_step id=${mapping.current_id} not found`);
const patch = stripSurrogateKeys(payload.patch || {});
if (Object.keys(patch).length === 0) {
return { status: "noop", reason: "empty patch" };
}
await ws.update(patch);
if (patch.name !== undefined && patch.name !== mapping.current_name) {
await updateName("workflow_step", mapping.current_id, patch.name);
}
return { status: "updated" };
};
const applyDropWorkflowStep = async ({ op, payload }) => {
const mapping = await lookupByUuid(op.entity_uuid);
if (!mapping) return { status: "noop" };
const ws = await WorkflowStep.findOne({ id: mapping.current_id });
if (ws) await ws.delete();
await removeEntityRow("workflow_step", mapping.current_id);
return { status: "dropped" };
};
// ---------------- Dispatch ----------------
const HANDLERS = {
create_table: applyCreateTable,
update_table: applyUpdateTable,
drop_table: applyDropTable,
create_field: applyCreateField,
update_field: applyUpdateField,
drop_field: applyDropField,
create_view: applyCreateView,
update_view: applyUpdateView,
drop_view: applyDropView,
create_page: applyCreatePage,
update_page: applyUpdatePage,
drop_page: applyDropPage,
create_trigger: applyCreateTrigger,
update_trigger: applyUpdateTrigger,
drop_trigger: applyDropTrigger,
create_role: applyCreateRole,
update_role: applyUpdateRole,
drop_role: applyDropRole,
create_library: applyCreateLibrary,
update_library: applyUpdateLibrary,
drop_library: applyDropLibrary,
create_tag: applyCreateTag,
update_tag: applyUpdateTag,
drop_tag: applyDropTag,
create_constraint: applyCreateConstraint,
drop_constraint: applyDropConstraint,
create_file: applyCreateFile,
drop_file: applyDropFile,
create_page_group: applyCreatePageGroup,
update_page_group: applyUpdatePageGroup,
drop_page_group: applyDropPageGroup,
create_page_group_member: applyCreatePageGroupMember,
drop_page_group_member: applyDropPageGroupMember,
create_workflow_step: applyCreateWorkflowStep,
update_workflow_step: applyUpdateWorkflowStep,
drop_workflow_step: applyDropWorkflowStep,
set_config: applySetConfig,
update_plugin_config: applyUpdatePluginConfig,
insert_row: applyInsertRow,
update_row: applyUpdateRow,
drop_row: applyDropRow,
set_table_mode: applySetTableMode
};
// Record an op into our journal with its source-side identity preserved.
// applied_at is set; status may be 'committed' (apply succeeded), 'skipped_cascade'
// (parent in same batch handled it), 'error' (apply failed), or 'conflict'
// (a local op touched the same entity since the last sync with this peer).
const persistOp = async (op, status, applied, extra) => {
const payload = typeof op.payload === "string" ? op.payload : JSON.stringify(op.payload || {});
const row = {
op_id: op.op_id,
source_env_id: op.source_env_id,
op_type: op.op_type,
entity_kind: op.entity_kind,
entity_uuid: op.entity_uuid,
payload: payload,
parent_op_id: op.parent_op_id,
correlation_id: op.correlation_id,
schema_version: op.schema_version || 1,
created_at: op.created_at,
applied_at: applied ? new Date().toISOString() : null,
status: status,
conflict_with_op_id: (extra && extra.conflict_with_op_id) || null
};
await db.insert("_dd_ops", row, { noid: true });
};
// Find the most recent local op on the same entity_uuid that was applied
// after our last inbound sync from this peer. If one exists, this incoming op
// represents concurrent divergent changes -- a conflict.
//
// Returns the local op_id if conflicting, null otherwise.
const findConflictingLocalOp = async (op, opts) => {
if (!op.entity_uuid || !opts.peerId || !opts.myEnvId) return null;
const anchor = await db.selectMaybeOne("_dd_anchors", { peer_id: opts.peerId, direction: "inbound" });
let cutoff = "1970-01-01T00:00:00.000Z";
if (anchor) {
const anchorOp = await db.selectMaybeOne("_dd_ops", { op_id: anchor.last_op_id });
if (anchorOp && anchorOp.applied_at) cutoff = anchorOp.applied_at;
}
const rs = await db.query(
`SELECT op_id FROM _dd_ops
WHERE entity_uuid = $1
AND source_env_id = $2
AND applied_at IS NOT NULL
AND applied_at > $3
AND status NOT IN ('rejected', 'reverted')
ORDER BY applied_at DESC
LIMIT 1`,
[op.entity_uuid, opts.myEnvId, cutoff]
);
return rs.rows.length > 0 ? rs.rows[0].op_id : null;
};
// Apply a batch of ops in created_at order. Children whose parent_op_id is in
// the same batch are journaled but not re-applied — their parent's apply will
// reproduce the cascade locally.
//
// opts.peerId + opts.myEnvId enable conflict detection: if an incoming op's
// entity_uuid has a local op applied since the last sync with this peer, the
// incoming op is journaled with status='conflict' instead of being applied.
// The admin resolves via /admin/dev-deploy/conflicts.
const applyBatch = async (ops, opts) => {
opts = opts || {};
await refreshState();
const sorted = [...ops].sort((a, b) => String(a.created_at).localeCompare(String(b.created_at)));
const opIdSet = new Set(sorted.map((o) => o.op_id));
const results = [];
for (const op of sorted) {
const existing = await db.selectMaybeOne("_dd_ops", { op_id: op.op_id });
if (existing) {
results.push({ op_id: op.op_id, status: "already_applied" });
continue;
}
if (op.parent_op_id && opIdSet.has(op.parent_op_id)) {
await persistOp(op, "skipped_cascade", true);
results.push({ op_id: op.op_id, status: "skipped_cascade" });
continue;
}
const conflictWith = await findConflictingLocalOp(op, opts);
if (conflictWith) {
await persistOp(op, "conflict", false, { conflict_with_op_id: conflictWith });
results.push({ op_id: op.op_id, status: "conflict", conflict_with: conflictWith });
continue;
}
const handler = HANDLERS[op.op_type];
if (!handler) {
await persistOp(op, "error", true);
results.push({ op_id: op.op_id, status: "error", error: `no handler for ${op.op_type}` });
continue;
}
try {
const payload = typeof op.payload === "string" ? JSON.parse(op.payload) : (op.payload || {});
// Resolve __dd_file_ref::<uuid> placeholders to local file paths
// before handing the payload to the model handler.
try { await fromPlaceholders(payload); } catch (e) { /* best-effort */ }
const r = await runSuppressed(() => handler({ op: op, payload: payload, opts: opts }));
await persistOp(op, "committed", true);
results.push({ op_id: op.op_id, status: "applied", detail: r });
} catch (err) {
// eslint-disable-next-line no-console
console.error(`[dev-deploy] apply ${op.op_type} (op=${op.op_id.slice(0,8)}) failed:`, err && err.stack ? err.stack : err);
await persistOp(op, "error", true);
results.push({ op_id: op.op_id, status: "error", error: err.message });
}
}
return results;
};
// Resolve a pending conflict.
// action='theirs': apply the conflicting incoming op now (suppressed),
// mark status='committed', clear conflict_with_op_id.
// action='mine' : mark the incoming op status='rejected', clear conflict_with.
// The local op stays as-is. The peer keeps sending this op_id
// on future pulls; we skip-by-id-already-present at the top of
// applyBatch (status check covers 'rejected' too via idempotency).
const resolveConflict = async (opId, action) => {
const op = await db.selectMaybeOne("_dd_ops", { op_id: opId });
if (!op) throw new Error(`op ${opId} not found`);
if (op.status !== "conflict") throw new Error(`op ${opId} is not in conflict status (status=${op.status})`);
if (action === "mine") {
await db.updateWhere("_dd_ops", {
status: "rejected",
conflict_with_op_id: null,
applied_at: new Date().toISOString()
}, { op_id: opId });
return { status: "rejected" };
}
if (action === "theirs") {
const handler = HANDLERS[op.op_type];
if (!handler) throw new Error(`no handler for ${op.op_type}`);
await refreshState();
const payload = typeof op.payload === "string" ? JSON.parse(op.payload) : (op.payload || {});
const r = await runSuppressed(() => handler({ op: op, payload: payload }));
await db.updateWhere("_dd_ops", {
status: "committed",
conflict_with_op_id: null,
applied_at: new Date().toISOString()
}, { op_id: opId });
return { status: "applied", detail: r };
}
throw new Error(`unknown action '${action}', expected 'theirs' or 'mine'`);
};
// Find the entity instance for a conflict op's entity_uuid.
// Returns { instance, kind } or null if the entity isn't present locally.
const findEntityForConflict = async (op) => {
if (!op.op_type || !op.entity_uuid) return null;
const dash = op.op_type.indexOf("_");
if (dash < 0) return null;
const kind = op.op_type.substring(dash + 1);
const m = await lookupByUuid(op.entity_uuid);
if (!m) return null;
await refreshState();
const Cls = {
table: require("@saltcorn/data/models/table"),
field: require("@saltcorn/data/models/field"),
view: require("@saltcorn/data/models/view"),
page: require("@saltcorn/data/models/page"),
trigger: require("@saltcorn/data/models/trigger"),
role: require("@saltcorn/data/models/role"),
library: require("@saltcorn/data/models/library"),
tag: require("@saltcorn/data/models/tag")
}[kind];
if (!Cls) return null;
const inst = await Cls.findOne({ id: m.current_id });
if (!inst) return null;
return { instance: inst, kind: kind };
};
// Compute fields where the incoming op's patch diverges from the local entity's
// current state. Returns an array of { field, currentValue, incomingValue }.
// Only meaningful for update_X conflicts; returns [] for other op types.
const conflictFieldDiff = async (incomingOp) => {
if (!incomingOp.op_type || !incomingOp.op_type.startsWith("update_")) {
return { diffs: [], reason: "merge only meaningful for update ops" };
}
const found = await findEntityForConflict(incomingOp);
if (!found) {
return { diffs: [], reason: "entity not present locally" };
}
const payload = typeof incomingOp.payload === "string" ? JSON.parse(incomingOp.payload) : (incomingOp.payload || {});
const patch = payload.patch || {};
const diffs = [];
for (const [field, incomingValue] of Object.entries(patch)) {
const currentValue = found.instance[field];
if (JSON.stringify(currentValue) === JSON.stringify(incomingValue)) {
continue;
}
diffs.push({
field: field,
currentValue: currentValue,
incomingValue: incomingValue
});
}
return { diffs: diffs, kind: found.kind, instance: found.instance };
};
// Apply a manual per-field merge. choices is { field: chosenValue } -- only
// fields that should be written are included; if empty, no model update runs.
// The original conflict op is marked status='merged' regardless.
const resolveConflictByMerge = async (opId, choices) => {
const op = await db.selectMaybeOne("_dd_ops", { op_id: opId });
if (!op) throw new Error(`op ${opId} not found`);
if (op.status !== "conflict") throw new Error(`op ${opId} is not in conflict status (status=${op.status})`);
if (!op.op_type.startsWith("update_")) {
throw new Error(`merge only supported for update_X ops, got ${op.op_type}`);
}
const found = await findEntityForConflict(op);
if (!found) throw new Error(`entity ${op.entity_uuid} not present locally`);
if (choices && Object.keys(choices).length > 0) {
const View = require("@saltcorn/data/models/view");
const Page = require("@saltcorn/data/models/page");
const Trigger = require("@saltcorn/data/models/trigger");
if (found.kind === "view") {
await View.update(choices, found.instance.id);
} else if (found.kind === "page") {
await Page.update(found.instance.id, choices);
} else if (found.kind === "trigger") {
await Trigger.update(found.instance.id, choices);
} else {
await found.instance.update(choices);
}
}
await db.updateWhere("_dd_ops", {
status: "merged",
conflict_with_op_id: null,
applied_at: new Date().toISOString()
}, { op_id: opId });
return { status: "merged", applied: choices || {} };
};
module.exports = {
applyBatch,
resolveConflict,
resolveConflictByMerge,
conflictFieldDiff,
HANDLERS
};