// Apply handlers — replay an op from a peer onto this instance. // // Each handler: // - Resolves the op's entity_uuid (and any parent_uuid) to this instance's // local integer id via _dd_entity_ids. // - Invokes the Saltcorn model method to reproduce the change. // - Updates _dd_entity_ids (insert / rename / remove) accordingly. // // While a handler runs, the CRUD wraps are suppressed (no journal writes, no // auto-UUID assignment) — we manage those side effects ourselves so the // foreign op_id and entity_uuid are preserved across instances. const db = require("@saltcorn/data/db"); const Table = require("@saltcorn/data/models/table"); const Field = require("@saltcorn/data/models/field"); const View = require("@saltcorn/data/models/view"); const Page = require("@saltcorn/data/models/page"); const Trigger = require("@saltcorn/data/models/trigger"); const Role = require("@saltcorn/data/models/role"); const Library = require("@saltcorn/data/models/library"); const Tag = require("@saltcorn/data/models/tag"); const TableConstraint = require("@saltcorn/data/models/table_constraints"); const File = require("@saltcorn/data/models/file"); const PageGroup = require("@saltcorn/data/models/page_group"); const PageGroupMember = require("@saltcorn/data/models/page_group_member"); const WorkflowStep = require("@saltcorn/data/models/workflow_step"); const { lookupByUuid, adoptUuid, updateName, removeEntityRow, lookupByCurrent, constraintDisplayName } = require("./entityIds"); const { runSuppressed } = require("./context"); const { ENTITY_KINDS, fileLocationToId } = require("./constants"); const { refreshState } = require("./state"); const { sha256Buffer, writeFileBytes, toAbsolutePath } = require("./files"); const { signedFetchBinary } = require("./transport"); const peers = require("./peers"); const { getEnv } = require("./env"); const { fromPlaceholders } = require("./payloadRefs"); const { ensureManagedSchema, setRowUuid, findIdByRowUuid, COLUMN_NAME: ROW_UUID_COL } = require("./rowIdentity"); const { portableToRow } = require("./rowPayload"); // Strip surrogate keys that don't translate across instances. Returns a new // object with all non-id properties. const stripSurrogateKeys = (obj, extraIdKeys) => { const drop = new Set(["id", "table_id", "view_id", "page_id", "role_id_for_create", ...(extraIdKeys || [])]); const out = {}; for (const k of Object.keys(obj || {})) { if (drop.has(k)) continue; out[k] = obj[k]; } return out; }; const requireLocalEntity = async (uuid, kind) => { const m = await lookupByUuid(uuid); if (!m) { throw new Error(`local entity for uuid=${uuid} (kind=${kind}) not found`); } return m; }; // ---------------- Tables ---------------- const applyCreateTable = async ({ op, payload }) => { const after = payload.after || {}; if (!after.name) { throw new Error("create_table missing after.name"); } const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const opts = stripSurrogateKeys(after); const tableName = opts.name; delete opts.name; const t = await Table.create(tableName, opts); await adoptUuid(op.entity_uuid, ENTITY_KINDS.TABLE, t.id, t.name, null); return { status: "created", local_id: t.id }; }; const applyUpdateTable = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.TABLE); const t = Table.findOne({ id: mapping.current_id }); if (!t) { throw new Error(`table id=${mapping.current_id} not found`); } const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await t.update(patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName(ENTITY_KINDS.TABLE, mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropTable = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop", reason: "already dropped" }; } const t = Table.findOne({ id: mapping.current_id }); if (!t) { await removeEntityRow(ENTITY_KINDS.TABLE, mapping.current_id); return { status: "noop", reason: "table not in saltcorn" }; } await t.delete(); await removeEntityRow(ENTITY_KINDS.TABLE, mapping.current_id); return { status: "dropped" }; }; // ---------------- Fields ---------------- const applyCreateField = async ({ op, payload }) => { const after = payload.after || {}; const parentUuid = payload.parent_uuid; if (!parentUuid) { throw new Error("create_field missing parent_uuid"); } const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const parent = await requireLocalEntity(parentUuid, ENTITY_KINDS.TABLE); const table = Table.findOne({ id: parent.current_id }); if (!table) { throw new Error(`parent table id=${parent.current_id} not found`); } const cfg = stripSurrogateKeys(after); cfg.table_id = table.id; const f = await Field.create(cfg); await adoptUuid(op.entity_uuid, ENTITY_KINDS.FIELD, f.id, f.name, parentUuid); return { status: "created", local_id: f.id }; }; const applyUpdateField = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.FIELD); const f = await Field.findOne({ id: mapping.current_id }); if (!f) { throw new Error(`field id=${mapping.current_id} not found`); } const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await f.update(patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName(ENTITY_KINDS.FIELD, mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropField = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop", reason: "already dropped" }; } const f = await Field.findOne({ id: mapping.current_id }); if (!f) { await removeEntityRow(ENTITY_KINDS.FIELD, mapping.current_id); return { status: "noop" }; } await f.delete(); await removeEntityRow(ENTITY_KINDS.FIELD, mapping.current_id); return { status: "dropped" }; }; // ---------------- Views ---------------- const applyCreateView = async ({ op, payload }) => { const after = payload.after || {}; const parentUuid = payload.parent_uuid; const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const cfg = stripSurrogateKeys(after); if (parentUuid) { const parent = await requireLocalEntity(parentUuid, ENTITY_KINDS.TABLE); cfg.table_id = parent.current_id; } const v = await View.create(cfg); await adoptUuid(op.entity_uuid, ENTITY_KINDS.VIEW, v.id, v.name, parentUuid || null); return { status: "created", local_id: v.id }; }; const applyUpdateView = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.VIEW); const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await View.update(patch, mapping.current_id); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName(ENTITY_KINDS.VIEW, mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropView = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop", reason: "already dropped" }; } const v = View.findOne({ id: mapping.current_id }); if (!v) { await removeEntityRow(ENTITY_KINDS.VIEW, mapping.current_id); return { status: "noop" }; } await v.delete(); await removeEntityRow(ENTITY_KINDS.VIEW, mapping.current_id); return { status: "dropped" }; }; // ---------------- Pages ---------------- const applyCreatePage = async ({ op, payload }) => { const after = payload.after || {}; const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const cfg = stripSurrogateKeys(after); const p = await Page.create(cfg); await adoptUuid(op.entity_uuid, ENTITY_KINDS.PAGE, p.id, p.name, null); return { status: "created", local_id: p.id }; }; const applyUpdatePage = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.PAGE); const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await Page.update(mapping.current_id, patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName(ENTITY_KINDS.PAGE, mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropPage = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop", reason: "already dropped" }; } const p = Page.findOne({ id: mapping.current_id }); if (!p) { await removeEntityRow(ENTITY_KINDS.PAGE, mapping.current_id); return { status: "noop" }; } await p.delete(); await removeEntityRow(ENTITY_KINDS.PAGE, mapping.current_id); return { status: "dropped" }; }; // ---------------- Triggers ---------------- const applyCreateTrigger = async ({ op, payload }) => { const after = payload.after || {}; const parentUuid = payload.parent_uuid; const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const cfg = stripSurrogateKeys(after); if (parentUuid) { const parent = await requireLocalEntity(parentUuid, ENTITY_KINDS.TABLE); cfg.table_id = parent.current_id; } const tr = await Trigger.create(cfg); await adoptUuid(op.entity_uuid, ENTITY_KINDS.TRIGGER, tr.id, tr.name || `trigger_${tr.id}`, parentUuid || null); return { status: "created", local_id: tr.id }; }; const applyUpdateTrigger = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.TRIGGER); const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await Trigger.update(mapping.current_id, patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName(ENTITY_KINDS.TRIGGER, mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropTrigger = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop", reason: "already dropped" }; } const tr = Trigger.findOne({ id: mapping.current_id }); if (!tr) { await removeEntityRow(ENTITY_KINDS.TRIGGER, mapping.current_id); return { status: "noop" }; } await tr.delete(); await removeEntityRow(ENTITY_KINDS.TRIGGER, mapping.current_id); return { status: "dropped" }; }; // ---------------- Roles ---------------- const applyCreateRole = async ({ op, payload }) => { const after = payload.after || {}; if (!after.id || !after.role) { throw new Error("create_role missing after.id or after.role"); } const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } await Role.create({ id: after.id, role: after.role }); await adoptUuid(op.entity_uuid, ENTITY_KINDS.ROLE, after.id, after.role, null); return { status: "created", local_id: after.id }; }; const applyUpdateRole = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.ROLE); const r = await Role.findOne({ id: mapping.current_id }); const patch = payload.patch || {}; if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await r.update(patch); if (patch.role !== undefined && patch.role !== mapping.current_name) { await updateName(ENTITY_KINDS.ROLE, mapping.current_id, patch.role); } return { status: "updated" }; }; const applyDropRole = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop" }; } const r = await Role.findOne({ id: mapping.current_id }); if (r) { await r.delete(); } await removeEntityRow(ENTITY_KINDS.ROLE, mapping.current_id); return { status: "dropped" }; }; // ---------------- Library ---------------- const applyCreateLibrary = async ({ op, payload }) => { const after = payload.after || {}; if (!after.name) { throw new Error("create_library missing after.name"); } const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const cfg = stripSurrogateKeys(after); await Library.create(cfg); const fresh = await Library.findOne({ name: after.name }); if (!fresh) { throw new Error("library not found after create"); } await adoptUuid(op.entity_uuid, ENTITY_KINDS.LIBRARY, fresh.id, fresh.name, null); return { status: "created", local_id: fresh.id }; }; const applyUpdateLibrary = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.LIBRARY); const li = await Library.findOne({ id: mapping.current_id }); const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await li.update(patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName(ENTITY_KINDS.LIBRARY, mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropLibrary = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop" }; } const li = await Library.findOne({ id: mapping.current_id }); if (li) { await li.delete(); } await removeEntityRow(ENTITY_KINDS.LIBRARY, mapping.current_id); return { status: "dropped" }; }; // ---------------- Tags ---------------- const applyCreateTag = async ({ op, payload }) => { const after = payload.after || {}; const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const cfg = stripSurrogateKeys(after); const tg = await Tag.create(cfg); await adoptUuid(op.entity_uuid, ENTITY_KINDS.TAG, tg.id, tg.name, null); return { status: "created", local_id: tg.id }; }; const applyUpdateTag = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, ENTITY_KINDS.TAG); const tg = await Tag.findOne({ id: mapping.current_id }); const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await tg.update(patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName(ENTITY_KINDS.TAG, mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropTag = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop" }; } const tg = await Tag.findOne({ id: mapping.current_id }); if (tg) { await tg.delete(); } await removeEntityRow(ENTITY_KINDS.TAG, mapping.current_id); return { status: "dropped" }; }; // ---------------- Files ---------------- const applyCreateFile = async ({ op, payload, opts }) => { const after = payload.after || {}; const relPath = after.relative_path; if (!relPath) throw new Error("create_file missing relative_path"); if (!after.content_hash) throw new Error("create_file missing content_hash"); const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } if (!opts || !opts.peerId) { throw new Error("file apply requires opts.peerId to fetch binary from"); } const peer = await peers.findPeer(opts.peerId); if (!peer) throw new Error(`peer ${opts.peerId} not found`); const secret = await peers.peerSecret(opts.peerId); const env = await getEnv(); const r = await signedFetchBinary({ baseUrl: peer.base_url, method: "GET", path: `/dev-deploy/api/file/${encodeURIComponent(op.entity_uuid)}`, body: null, sourceEnvId: env.env_id, secret: secret, requireTls: peer.require_tls }); if (!r.ok) { throw new Error(`binary fetch returned ${r.status}`); } const bytes = r.bytes; const actualHash = sha256Buffer(bytes); if (actualHash !== after.content_hash) { throw new Error(`content hash mismatch: expected ${after.content_hash}, got ${actualHash}`); } const absPath = toAbsolutePath(File, db, relPath); await writeFileBytes(absPath, bytes); const localFile = await File.create({ filename: after.filename, location: absPath, uploaded_at: new Date(), size_kb: after.size_kb, mime_super: after.mime_super, mime_sub: after.mime_sub, min_role_read: after.min_role_read }); const synthId = fileLocationToId(relPath); await adoptUuid(op.entity_uuid, ENTITY_KINDS.FILE, synthId, relPath, null); return { status: "created", location: absPath, bytes: bytes.length }; }; const applyDropFile = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop", reason: "already dropped" }; } const absPath = toAbsolutePath(File, db, mapping.current_name); const file = await File.findOne({ location: absPath }); if (file) { await file.delete(); } await removeEntityRow(ENTITY_KINDS.FILE, mapping.current_id); return { status: "dropped" }; }; // ---------------- TableConstraints ---------------- const applyCreateConstraint = async ({ op, payload }) => { const after = payload.after || {}; const parentUuid = payload.parent_uuid; if (!parentUuid) throw new Error("create_constraint missing parent_uuid"); if (!after.type) throw new Error("create_constraint missing type"); const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } const parent = await requireLocalEntity(parentUuid, "table"); const cfg = { table_id: parent.current_id, type: after.type, configuration: after.configuration || {} }; const result = await TableConstraint.create(cfg); await adoptUuid(op.entity_uuid, "constraint", result.id, constraintDisplayName(result), parentUuid); return { status: "created", local_id: result.id }; }; const applyDropConstraint = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) { return { status: "noop", reason: "already dropped" }; } const con = await TableConstraint.findOne({ id: mapping.current_id }); if (con) { await con.delete(); } await removeEntityRow("constraint", mapping.current_id); return { status: "dropped" }; }; // ---------------- PageGroup ---------------- const applyCreatePageGroup = async ({ op, payload }) => { const after = payload.after || {}; if (!after.name) throw new Error("create_page_group missing after.name"); const existing = await lookupByUuid(op.entity_uuid); if (existing) { return { status: "noop", reason: "uuid already mapped" }; } // Create the group first, empty members. PageGroupMember's constructor // demands page_group_id which doesn't exist until after the insert. const cfg = stripSurrogateKeys(after); cfg.members = []; const result = await PageGroup.create(cfg); // Now resolve each member's page_uuid to local page_id and addMember. for (const m of payload.members || []) { if (!m.page_uuid) { throw new Error("page_group member missing page_uuid"); } const local = await lookupByUuid(m.page_uuid); if (!local) { throw new Error(`page_group member references unmapped page_uuid=${m.page_uuid}`); } await result.addMember({ page_id: local.current_id, eligible_formula: m.eligible_formula, description: m.description, sequence: m.sequence }); } await adoptUuid(op.entity_uuid, "page_group", result.id, result.name, null); return { status: "created", local_id: result.id }; }; const applyUpdatePageGroup = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, "page_group"); const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await PageGroup.update(mapping.current_id, patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName("page_group", mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropPageGroup = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) return { status: "noop" }; const pg = PageGroup.findOne({ id: mapping.current_id }); if (pg) await pg.delete(); await removeEntityRow("page_group", mapping.current_id); return { status: "dropped" }; }; // ---------------- Table rows (managed/starter) ---------------- const findLocalTableByUuid = async (tableUuid) => { const ent = await lookupByUuid(tableUuid); if (!ent || ent.kind !== "table") return null; await refreshState(); return Table.findOne({ id: ent.current_id }); }; const applyInsertRow = async ({ op, payload }) => { if (!payload || !payload.table_uuid) throw new Error("insert_row missing table_uuid"); if (!op.entity_uuid) throw new Error("insert_row missing row_uuid (entity_uuid)"); const tbl = await findLocalTableByUuid(payload.table_uuid); if (!tbl) throw new Error(`local table for uuid=${payload.table_uuid} not found`); await ensureManagedSchema(tbl.name); // Idempotency: if a row with this uuid already exists, skip const existing = await findIdByRowUuid(tbl.name, op.entity_uuid); if (existing) { return { status: "noop", reason: "row uuid already present" }; } const rowData = await portableToRow(payload.after || {}, tbl); const newId = await tbl.insertRow(rowData); if (!newId) throw new Error("insertRow returned no id"); await setRowUuid(tbl.name, newId, op.entity_uuid); return { status: "inserted", local_id: newId }; }; const applyUpdateRow = async ({ op, payload }) => { if (!payload || !payload.table_uuid) throw new Error("update_row missing table_uuid"); if (!op.entity_uuid) throw new Error("update_row missing row_uuid"); const tbl = await findLocalTableByUuid(payload.table_uuid); if (!tbl) throw new Error(`local table for uuid=${payload.table_uuid} not found`); await ensureManagedSchema(tbl.name); const localId = await findIdByRowUuid(tbl.name, op.entity_uuid); if (!localId) { // The row doesn't exist on target yet — treat as insert const rowData = await portableToRow(payload.patch || {}, tbl); const newId = await tbl.insertRow(rowData); await setRowUuid(tbl.name, newId, op.entity_uuid); return { status: "inserted_for_update", local_id: newId }; } const patch = await portableToRow(payload.patch || {}, tbl); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await tbl.updateRow(patch, localId); return { status: "updated", local_id: localId }; }; const applyDropRow = async ({ op, payload }) => { if (!payload || !payload.table_uuid) throw new Error("drop_row missing table_uuid"); if (!op.entity_uuid) throw new Error("drop_row missing row_uuid"); const tbl = await findLocalTableByUuid(payload.table_uuid); if (!tbl) return { status: "noop", reason: "table not present locally" }; const localId = await findIdByRowUuid(tbl.name, op.entity_uuid); if (!localId) return { status: "noop", reason: "row uuid not present locally" }; await tbl.deleteRows({ id: localId }); return { status: "dropped", local_id: localId }; }; const applySetTableMode = async ({ op, payload }) => { if (!payload || !payload.table_uuid) throw new Error("set_table_mode missing table_uuid"); const tbl = await findLocalTableByUuid(payload.table_uuid); if (!tbl) throw new Error(`local table for uuid=${payload.table_uuid} not found`); const mode = payload.data_mode || "user"; if (mode === "managed" || mode === "starter") { await ensureManagedSchema(tbl.name); } const now = new Date().toISOString(); const existing = await db.selectMaybeOne("_dd_table_modes", { table_uuid: payload.table_uuid }); if (existing) { await db.updateWhere("_dd_table_modes", { data_mode: mode, updated_at: now }, { table_uuid: payload.table_uuid }); } else { await db.insert("_dd_table_modes", { table_uuid: payload.table_uuid, data_mode: mode, updated_at: now }, { noid: true }); } return { status: "set", mode: mode }; }; // ---------------- Plugin configuration ---------------- const applyUpdatePluginConfig = async ({ op, payload }) => { const Plugin = require("@saltcorn/data/models/plugin"); if (!payload || !payload.name) { throw new Error("update_plugin_config missing payload.name"); } if (payload.name === "dev-deploy") { return { status: "noop", reason: "skipping our own plugin" }; } const plugin = await Plugin.findOne({ name: payload.name }); if (!plugin) { throw new Error(`plugin "${payload.name}" is not installed on this instance`); } plugin.configuration = payload.configuration || {}; await plugin.upsert(); return { status: "updated", plugin: payload.name }; }; // ---------------- Config (menu_items etc.) ---------------- const applySetConfig = async ({ op, payload }) => { const { key, value } = payload || {}; if (!key) throw new Error("set_config missing key"); const { getState } = require("@saltcorn/data/db/state"); await getState().setConfig(key, value); return { status: "set", key: key }; }; // ---------------- PageGroupMember ---------------- const applyCreatePageGroupMember = async ({ op, payload }) => { const after = payload.after || {}; const parentUuid = payload.parent_uuid; if (!parentUuid) throw new Error("create_page_group_member missing parent_uuid"); if (!after.page_uuid) throw new Error("create_page_group_member missing page_uuid"); const existing = await lookupByUuid(op.entity_uuid); if (existing) return { status: "noop", reason: "uuid already mapped" }; const group = await requireLocalEntity(parentUuid, "page_group"); const page = await requireLocalEntity(after.page_uuid, "page"); const pg = PageGroup.findOne({ id: group.current_id }); if (!pg) throw new Error(`local page_group id=${group.current_id} not found`); const member = await pg.addMember({ page_id: page.current_id, eligible_formula: after.eligible_formula, description: after.description }); await adoptUuid(op.entity_uuid, "page_group_member", member.id, `member_${member.id}`, parentUuid); return { status: "created", local_id: member.id }; }; const applyDropPageGroupMember = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) return { status: "noop" }; await PageGroupMember.delete(mapping.current_id); await removeEntityRow("page_group_member", mapping.current_id); return { status: "dropped" }; }; // ---------------- WorkflowStep ---------------- const applyCreateWorkflowStep = async ({ op, payload }) => { const after = payload.after || {}; const parentUuid = payload.parent_uuid; const existing = await lookupByUuid(op.entity_uuid); if (existing) return { status: "noop", reason: "uuid already mapped" }; let parentTriggerId = null; if (parentUuid) { const parent = await requireLocalEntity(parentUuid, "trigger"); parentTriggerId = parent.current_id; } const cfg = stripSurrogateKeys(after); if (parentTriggerId) cfg.trigger_id = parentTriggerId; const result = await WorkflowStep.create(cfg); // WorkflowStep.create returns the inserted integer id, not an instance. const id = typeof result === "number" ? result : (result && result.id); if (!id) throw new Error("WorkflowStep.create returned no id"); await adoptUuid(op.entity_uuid, "workflow_step", id, after.name || `step_${id}`, parentUuid || null); return { status: "created", local_id: id }; }; const applyUpdateWorkflowStep = async ({ op, payload }) => { const mapping = await requireLocalEntity(op.entity_uuid, "workflow_step"); const ws = await WorkflowStep.findOne({ id: mapping.current_id }); if (!ws) throw new Error(`workflow_step id=${mapping.current_id} not found`); const patch = stripSurrogateKeys(payload.patch || {}); if (Object.keys(patch).length === 0) { return { status: "noop", reason: "empty patch" }; } await ws.update(patch); if (patch.name !== undefined && patch.name !== mapping.current_name) { await updateName("workflow_step", mapping.current_id, patch.name); } return { status: "updated" }; }; const applyDropWorkflowStep = async ({ op, payload }) => { const mapping = await lookupByUuid(op.entity_uuid); if (!mapping) return { status: "noop" }; const ws = await WorkflowStep.findOne({ id: mapping.current_id }); if (ws) await ws.delete(); await removeEntityRow("workflow_step", mapping.current_id); return { status: "dropped" }; }; // ---------------- Dispatch ---------------- const HANDLERS = { create_table: applyCreateTable, update_table: applyUpdateTable, drop_table: applyDropTable, create_field: applyCreateField, update_field: applyUpdateField, drop_field: applyDropField, create_view: applyCreateView, update_view: applyUpdateView, drop_view: applyDropView, create_page: applyCreatePage, update_page: applyUpdatePage, drop_page: applyDropPage, create_trigger: applyCreateTrigger, update_trigger: applyUpdateTrigger, drop_trigger: applyDropTrigger, create_role: applyCreateRole, update_role: applyUpdateRole, drop_role: applyDropRole, create_library: applyCreateLibrary, update_library: applyUpdateLibrary, drop_library: applyDropLibrary, create_tag: applyCreateTag, update_tag: applyUpdateTag, drop_tag: applyDropTag, create_constraint: applyCreateConstraint, drop_constraint: applyDropConstraint, create_file: applyCreateFile, drop_file: applyDropFile, create_page_group: applyCreatePageGroup, update_page_group: applyUpdatePageGroup, drop_page_group: applyDropPageGroup, create_page_group_member: applyCreatePageGroupMember, drop_page_group_member: applyDropPageGroupMember, create_workflow_step: applyCreateWorkflowStep, update_workflow_step: applyUpdateWorkflowStep, drop_workflow_step: applyDropWorkflowStep, set_config: applySetConfig, update_plugin_config: applyUpdatePluginConfig, insert_row: applyInsertRow, update_row: applyUpdateRow, drop_row: applyDropRow, set_table_mode: applySetTableMode }; // Record an op into our journal with its source-side identity preserved. // applied_at is set; status may be 'committed' (apply succeeded), 'skipped_cascade' // (parent in same batch handled it), 'error' (apply failed), or 'conflict' // (a local op touched the same entity since the last sync with this peer). const persistOp = async (op, status, applied, extra) => { const payload = typeof op.payload === "string" ? op.payload : JSON.stringify(op.payload || {}); const row = { op_id: op.op_id, source_env_id: op.source_env_id, op_type: op.op_type, entity_kind: op.entity_kind, entity_uuid: op.entity_uuid, payload: payload, parent_op_id: op.parent_op_id, correlation_id: op.correlation_id, schema_version: op.schema_version || 1, created_at: op.created_at, applied_at: applied ? new Date().toISOString() : null, status: status, conflict_with_op_id: (extra && extra.conflict_with_op_id) || null }; await db.insert("_dd_ops", row, { noid: true }); }; // Find the most recent local op on the same entity_uuid that was applied // after our last inbound sync from this peer. If one exists, this incoming op // represents concurrent divergent changes -- a conflict. // // Returns the local op_id if conflicting, null otherwise. const findConflictingLocalOp = async (op, opts) => { if (!op.entity_uuid || !opts.peerId || !opts.myEnvId) return null; const anchor = await db.selectMaybeOne("_dd_anchors", { peer_id: opts.peerId, direction: "inbound" }); let cutoff = "1970-01-01T00:00:00.000Z"; if (anchor) { const anchorOp = await db.selectMaybeOne("_dd_ops", { op_id: anchor.last_op_id }); if (anchorOp && anchorOp.applied_at) cutoff = anchorOp.applied_at; } const schema = db.getTenantSchemaPrefix(); const rs = await db.query( `SELECT op_id FROM ${schema}_dd_ops WHERE entity_uuid = $1 AND source_env_id = $2 AND applied_at IS NOT NULL AND applied_at > $3 AND status NOT IN ('rejected', 'reverted') ORDER BY applied_at DESC LIMIT 1`, [op.entity_uuid, opts.myEnvId, cutoff] ); return rs.rows.length > 0 ? rs.rows[0].op_id : null; }; // Apply a batch of ops in created_at order. Children whose parent_op_id is in // the same batch are journaled but not re-applied — their parent's apply will // reproduce the cascade locally. // // opts.peerId + opts.myEnvId enable conflict detection: if an incoming op's // entity_uuid has a local op applied since the last sync with this peer, the // incoming op is journaled with status='conflict' instead of being applied. // The admin resolves via /admin/dev-deploy/conflicts. const applyBatch = async (ops, opts) => { opts = opts || {}; await refreshState(); const sorted = [...ops].sort((a, b) => String(a.created_at).localeCompare(String(b.created_at))); const opIdSet = new Set(sorted.map((o) => o.op_id)); const results = []; for (const op of sorted) { const existing = await db.selectMaybeOne("_dd_ops", { op_id: op.op_id }); if (existing) { results.push({ op_id: op.op_id, status: "already_applied" }); continue; } if (op.parent_op_id && opIdSet.has(op.parent_op_id)) { await persistOp(op, "skipped_cascade", true); results.push({ op_id: op.op_id, status: "skipped_cascade" }); continue; } const conflictWith = await findConflictingLocalOp(op, opts); if (conflictWith) { await persistOp(op, "conflict", false, { conflict_with_op_id: conflictWith }); results.push({ op_id: op.op_id, status: "conflict", conflict_with: conflictWith }); continue; } const handler = HANDLERS[op.op_type]; if (!handler) { await persistOp(op, "error", true); results.push({ op_id: op.op_id, status: "error", error: `no handler for ${op.op_type}` }); continue; } try { const payload = typeof op.payload === "string" ? JSON.parse(op.payload) : (op.payload || {}); // Resolve __dd_file_ref:: placeholders to local file paths // before handing the payload to the model handler. try { await fromPlaceholders(payload); } catch (e) { /* best-effort */ } const r = await runSuppressed(() => handler({ op: op, payload: payload, opts: opts })); await persistOp(op, "committed", true); results.push({ op_id: op.op_id, status: "applied", detail: r }); } catch (err) { // eslint-disable-next-line no-console console.error(`[dev-deploy] apply ${op.op_type} (op=${op.op_id.slice(0,8)}) failed:`, err && err.stack ? err.stack : err); await persistOp(op, "error", true); results.push({ op_id: op.op_id, status: "error", error: err.message }); } } return results; }; // Resolve a pending conflict. // action='theirs': apply the conflicting incoming op now (suppressed), // mark status='committed', clear conflict_with_op_id. // action='mine' : mark the incoming op status='rejected', clear conflict_with. // The local op stays as-is. The peer keeps sending this op_id // on future pulls; we skip-by-id-already-present at the top of // applyBatch (status check covers 'rejected' too via idempotency). const resolveConflict = async (opId, action, opts) => { opts = opts || {}; const op = await db.selectMaybeOne("_dd_ops", { op_id: opId }); if (!op) throw new Error(`op ${opId} not found`); if (op.status !== "conflict") throw new Error(`op ${opId} is not in conflict status (status=${op.status})`); if (action === "mine") { // "Use mine" on a create_file/drop_file conflict is a pure journal // status change just like the metadata case -- the local state stands // and the incoming op is rejected; no file bytes are touched. await db.updateWhere("_dd_ops", { status: "rejected", conflict_with_op_id: null, applied_at: new Date().toISOString() }, { op_id: opId }); return { status: "rejected" }; } if (action === "theirs") { const handler = HANDLERS[op.op_type]; if (!handler) throw new Error(`no handler for ${op.op_type}`); await refreshState(); const payload = typeof op.payload === "string" ? JSON.parse(op.payload) : (op.payload || {}); // create_file's handler needs opts.peerId to fetch the file bytes from // the originating peer. Pass opts through so file conflicts resolve too. const r = await runSuppressed(() => handler({ op: op, payload: payload, opts: opts })); await db.updateWhere("_dd_ops", { status: "committed", conflict_with_op_id: null, applied_at: new Date().toISOString() }, { op_id: opId }); return { status: "applied", detail: r }; } throw new Error(`unknown action '${action}', expected 'theirs' or 'mine'`); }; // Find the entity instance for a conflict op's entity_uuid. // Returns { instance, kind } or null if the entity isn't present locally. const findEntityForConflict = async (op) => { if (!op.op_type || !op.entity_uuid) return null; const dash = op.op_type.indexOf("_"); if (dash < 0) return null; const kind = op.op_type.substring(dash + 1); const m = await lookupByUuid(op.entity_uuid); if (!m) return null; await refreshState(); const Cls = { table: require("@saltcorn/data/models/table"), field: require("@saltcorn/data/models/field"), view: require("@saltcorn/data/models/view"), page: require("@saltcorn/data/models/page"), trigger: require("@saltcorn/data/models/trigger"), role: require("@saltcorn/data/models/role"), library: require("@saltcorn/data/models/library"), tag: require("@saltcorn/data/models/tag") }[kind]; if (!Cls) return null; const inst = await Cls.findOne({ id: m.current_id }); if (!inst) return null; return { instance: inst, kind: kind }; }; // Compute fields where the incoming op's patch diverges from the local entity's // current state. Returns an array of { field, currentValue, incomingValue }. // Only meaningful for update_X conflicts; returns [] for other op types. const conflictFieldDiff = async (incomingOp) => { if (!incomingOp.op_type || !incomingOp.op_type.startsWith("update_")) { return { diffs: [], reason: "merge only meaningful for update ops" }; } const found = await findEntityForConflict(incomingOp); if (!found) { return { diffs: [], reason: "entity not present locally" }; } const payload = typeof incomingOp.payload === "string" ? JSON.parse(incomingOp.payload) : (incomingOp.payload || {}); const patch = payload.patch || {}; const diffs = []; for (const [field, incomingValue] of Object.entries(patch)) { const currentValue = found.instance[field]; if (JSON.stringify(currentValue) === JSON.stringify(incomingValue)) { continue; } diffs.push({ field: field, currentValue: currentValue, incomingValue: incomingValue }); } return { diffs: diffs, kind: found.kind, instance: found.instance }; }; // Apply a manual per-field merge. choices is { field: chosenValue } -- only // fields that should be written are included; if empty, no model update runs. // The original conflict op is marked status='merged' regardless. const resolveConflictByMerge = async (opId, choices) => { const op = await db.selectMaybeOne("_dd_ops", { op_id: opId }); if (!op) throw new Error(`op ${opId} not found`); if (op.status !== "conflict") throw new Error(`op ${opId} is not in conflict status (status=${op.status})`); if (op.op_type === "create_file" || op.op_type === "drop_file") { throw new Error(`merge is not supported for file ops (${op.op_type}); use "use theirs" or "use mine"`); } if (!op.op_type.startsWith("update_")) { throw new Error(`merge only supported for update_X ops, got ${op.op_type}`); } const found = await findEntityForConflict(op); if (!found) throw new Error(`entity ${op.entity_uuid} not present locally`); if (choices && Object.keys(choices).length > 0) { const View = require("@saltcorn/data/models/view"); const Page = require("@saltcorn/data/models/page"); const Trigger = require("@saltcorn/data/models/trigger"); if (found.kind === "view") { await View.update(choices, found.instance.id); } else if (found.kind === "page") { await Page.update(found.instance.id, choices); } else if (found.kind === "trigger") { await Trigger.update(found.instance.id, choices); } else { await found.instance.update(choices); } } await db.updateWhere("_dd_ops", { status: "merged", conflict_with_op_id: null, applied_at: new Date().toISOString() }, { op_id: opId }); return { status: "merged", applied: choices || {} }; }; module.exports = { applyBatch, resolveConflict, resolveConflictByMerge, conflictFieldDiff, HANDLERS };