fs2port/port/tools/sceneryExtract.py
2026-05-13 21:32:05 -05:00

524 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
"""
SceneryExtract -- decode FS2 .SD scenery files into a polygon database.
Walks the chunk5 bytecode interpreter logic offline: starts at sid $2C
(the dispatcher), follows every $0D HEADER (loads sub-sections), every
$18 SUB_INVOKE, and every conditional/unconditional jump. Records:
- polygon-emit ops ($00/$01/$02 xform-A, $40/$41 xform-B,
$32/$33 cached-vertex, $35 cached-plot, $2B curve)
- cache-fill ops ($31, $42)
- station records ($05 ADF, $1D NAV, $1E COM)
- frame-control ops ($07 EnterLocalFrame, $24 PushOriginWithStash)
Output: JSON containing every visited section with its decoded ops,
plus a flat list of all polygon vertices (for downstream renderers
that don't want to interpret the VM themselves).
"""
import argparse
import json
import os
import sys
# -- file-offset arithmetic ------------------------------------------------
SECTION_BYTES = 256 # one chunk5 "sector" = 256 bytes
SECTOR_PER_BLOCK = 2 # 2 sectors per ProDOS 512-byte block
MAX_SID = 0x87 # ComputeBlockFromSector rejects sid >= $88
DISPATCHER_SID = 0x2C # bootstrap (LA619) sid; common to all files
def sidToOffset(sid):
"""Return .SD file byte offset for a section id, or None if out of range.
Empirically verified: FS2.1's dispatcher (sid $2C) lives at file
offset 49152, which matches MAME's captured RAM at $A800.
The formula:
file_offset = (((sid >> 2) + 1) * 16 + (sid & 3)) * 256
The runtime path (chunk4.s::FetchSectorFromDisk + ReadBlocks via
SmartPort) does additional shifts to convert this into a ProDOS
block# and looks up the actual disk block via an in-memory table
seeded from the ProDOS file index. For an offline extractor
operating on the .SD (which is the file content in logical block
order), the formula above directly indexes the file.
"""
if sid < 0 or sid > MAX_SID:
return None
return (((sid >> 2) + 1) * 16 + (sid & 3)) * SECTION_BYTES
# -- opcode metadata -------------------------------------------------------
# Each entry: (mnemonic, fixed-byte-count or None for variable)
OPCODES = {
0x00: ("EmitV1XformAndPlot", 7), # xform-A V1 + plot pixel
0x01: ("EmitV1Xform80C5", 7), # xform-A V1 silent
0x02: ("EmitV2Xform80C5", 7), # xform-A V2 + draw
0x03: ("Call64K_2", 6), # 64K callback #2
0x04: ("CullByOutcodeList", None), # variable (terminator-driven)
0x05: ("ADFRecord", 9), # freq + 24-bit XYZ
0x06: ("DrawLine", 5), # 2D screen-coord line
0x07: ("EnterLocalFrame", 14), # set local frame anchor
0x08: ("Invalid08", 1),
0x09: ("Skip3", 3),
0x0A: ("Skip3b", 3),
0x0B: ("JumpRelative", 3), # 16-bit signed offset
0x0C: ("Invalid0C", 1),
0x0D: ("Header", 6), # SECTION-LOAD opcode
0x0E: ("Call64K", 1), # no-op in 48K mode
0x0F: ("Invalid0F", 1),
0x10: ("Invalid10", 1),
0x11: ("Skip1", 1),
0x12: ("SetColor", 2),
0x13: ("JumpIfBeyondXY", 9), # cull XY-plane
0x14: ("JumpIfBeyondXYZ", 11), # cull XYZ
0x15: ("Invalid15", 1),
0x16: ("Invalid16", 1),
0x17: ("Invalid17", 1),
0x18: ("SubInvoke", 3), # JSR-style relative
0x19: ("Return", 1), # RTS
0x1A: ("WriteWord", 5), # *dst = *src
0x1B: ("ModeWhite", 1),
0x1C: ("DayOnly", 1),
0x1D: ("NAVRecord", 11), # freq + XYZ + 16-bit Z
0x1E: ("COMRecord", None), # variable; len in cursor[1]
0x1F: ("Invalid1F", 1),
0x20: ("CullIfOutside1", 9),
0x21: ("CullIfOutside2", 15),
0x22: ("CullIfOutside3", 21),
0x23: ("JumpIfBitsClear", 7),
0x24: ("PushOriginWithStash", 8),
0x25: ("StoreImmWord", 5),
0x26: ("Invalid26", 1),
0x27: ("Invalid27", 1),
0x28: ("JumpIfWordCompare", 9),
0x29: ("CopyToD2", 1),
0x2A: ("Invalid2A", 1),
0x2B: ("EmitCurve", 9),
0x2C: ("Invalid2C", 1),
0x2D: ("Invalid2D", 1),
0x2E: ("Invalid2E", 1),
0x2F: ("ResetState", 1),
0x30: ("Invalid30", 1),
0x31: ("RefreshCachedXform80C5", 8),
0x32: ("VertexCachedV1", 2),
0x33: ("VertexCachedV2", 2),
0x34: ("Invalid34", 1),
0x35: ("VertexCachedDraw", 2),
0x40: ("EmitV1Xform7EBC", 5),
0x41: ("EmitV2Xform7EBC", 5),
0x42: ("RefreshCachedXform7EBC", 6),
}
# -- bytecode decoder ------------------------------------------------------
# We DON'T evaluate culls (we want to see ALL polygons). We DO evaluate:
# - $0D HEADER: load and walk the referenced section
# - $18 SUB_INVOKE: walk the referenced sub-record then return
# - $19 RTS: terminate current walk
# - jumps: take fall-through (don't follow conditional branches)
# This biases toward the WORST CASE of geometry the file describes,
# which is what we want for an offline asset extraction.
class Decoder:
def __init__(self, sd_bytes):
self.sd = sd_bytes
self.sections = {} # sid -> {"raw": bytes, "ops": [...]}
self.visitedHeaders = set()
self.errors = []
def readSection(self, sid, count_sectors=1):
"""Return up to count_sectors * 256 bytes starting at sid."""
off = sidToOffset(sid)
if off is None:
return None
end = off + count_sectors * SECTION_BYTES
if end > len(self.sd):
end = len(self.sd)
return self.sd[off:end]
def decodeSection(self, sid, count_sectors, depth=0, source=None):
"""Walk the bytecode for one section, recording ops we encounter."""
key = (sid, count_sectors)
if key in self.visitedHeaders:
return
self.visitedHeaders.add(key)
data = self.readSection(sid, count_sectors)
if data is None or len(data) == 0:
self.errors.append(f"section ${sid:02X} unreadable")
return
ops = []
secInfo = {
"sid": sid,
"file_offset": sidToOffset(sid),
"count_sectors": count_sectors,
"size_bytes": len(data),
"source": source,
"ops": ops,
}
self.sections[sid] = secInfo
coverage = self._walk(data, sid, ops, depth)
secInfo["coverage_bytes"] = coverage["covered"]
secInfo["unreached_bytes"] = coverage["unreached"]
secInfo["out_of_range_branches"] = coverage["out_of_range"]
def _readSignedWord(self, data, off):
"""Read 16-bit LE signed word, or None if out of range."""
if off + 1 >= len(data):
return None
v = data[off] | (data[off + 1] << 8)
return v - 0x10000 if v >= 0x8000 else v
def _opLen(self, data, pc):
"""Return byte count for opcode at data[pc], or None to halt walk."""
if pc >= len(data):
return None
op = data[pc]
if op & 0x80 or op > 0x45:
return None # SceneryStreamEnd (terminator)
meta = OPCODES.get(op)
if meta is None:
return 1 # unknown: skip 1 byte, keep walking
_, n = meta
if n is not None:
return n
# Variable length:
if op == 0x1E:
if pc + 1 >= len(data):
return None
length = data[pc + 1]
return length # COMRecord advances by `length`, NOT length+2
if op == 0x04:
# CullByOutcodeList: opcode + 2-byte jump + N vertex-index
# bytes + 1-byte terminator (high bit set). Per chunk5.s:2019,
# cursor advances by `4 + N` bytes total.
n = 3
while pc + n < len(data) and (data[pc + n] & 0x80) == 0:
n += 1
return n + 1 # include terminator byte
return 1
def _walk(self, data, base_sid, ops, depth):
"""Walk every reachable byte in this section's bytecode.
Each pc gets visited at most once (visited set). For conditional
jumps and SubInvoke, both branches are followed (depth-first).
For HEADER, the referenced section is loaded recursively.
"""
visited = set()
worklist = [0]
outOfRangeBranches = 0
while worklist:
pc = worklist.pop()
if pc in visited or pc < 0 or pc >= len(data):
continue
visited.add(pc)
op = data[pc]
if op & 0x80 or op > 0x45:
continue # terminator: end of this branch
n = self._opLen(data, pc)
if n is None:
continue
opname = OPCODES.get(op, ("Unknown", 1))[0]
entry = {
"pc": pc,
"op": op,
"name": opname,
"bytes": list(data[pc:pc + n]),
}
self._annotate(entry, data, pc, base_sid, depth)
ops.append(entry)
if op == 0x0D and depth < 6:
# Section load: walk the referenced section.
sid_load = data[pc + 1]
cnt_load = data[pc + 2]
self.decodeSection(sid_load, cnt_load, depth + 1,
source=f"sid${base_sid:02X}@{pc}")
# Continue past the HEADER into the dispatcher.
worklist.append(pc + n)
elif op == 0x18:
# SubInvoke: take BOTH the call target (in-section) AND
# the resume point (pc + 3). The call uses a 16-bit
# signed offset relative to the opcode address.
off = self._readSignedWord(data, pc + 1)
if off is not None:
target = pc + off
if 0 <= target < len(data):
worklist.append(target)
else:
outOfRangeBranches += 1
worklist.append(pc + n)
elif op == 0x19:
# RTS: end of this sub-record (don't fall through).
continue
elif op in (0x0B,):
# Unconditional relative jump. Follow target only.
off = self._readSignedWord(data, pc + 1)
if off is not None:
target = pc + off
if 0 <= target < len(data):
worklist.append(target)
else:
outOfRangeBranches += 1
elif op in (0x13, 0x14, 0x20, 0x21, 0x22, 0x23, 0x28):
# Conditional jump: explore BOTH branches so the offline
# walker enumerates every reachable polygon regardless of
# camera state.
off = self._readSignedWord(data, pc + 1)
if off is not None:
target = pc + off
if 0 <= target < len(data):
worklist.append(target)
else:
outOfRangeBranches += 1
worklist.append(pc + n) # fall-through
elif op == 0x04:
# CullByOutcodeList: jumps to L00A5 (= the address read
# by ReadRelativeAddr from cursor+1..+2) on full-cull,
# else falls through. Take both.
off = self._readSignedWord(data, pc + 1)
if off is not None:
target = pc + off
if 0 <= target < len(data):
worklist.append(target)
else:
outOfRangeBranches += 1
worklist.append(pc + n)
else:
# Default: continue linearly past this op.
worklist.append(pc + n)
# Mark all bytes consumed by this op as visited so the
# coverage report knows we examined them.
for b in range(pc + 1, min(pc + n, len(data))):
visited.add(b)
# Compute coverage stats.
coveredBytes = len(visited)
return {
"covered": coveredBytes,
"unreached": len(data) - coveredBytes,
"out_of_range": outOfRangeBranches,
}
def _annotate(self, entry, data, pc, base_sid, depth):
"""Decode op-specific fields for nicer JSON. Truncated records (op
spilling past section's allocated bytes) skip annotation rather
than reading beyond the buffer."""
op = entry["op"]
b = data
# Use the canonical opcode length, not the (possibly truncated)
# captured slice, when deciding bounds.
meta = OPCODES.get(op)
opLen = meta[1] if meta and meta[1] is not None else len(entry["bytes"])
if pc + opLen > len(data):
return
if op == 0x0D:
entry["sid_load"] = b[pc + 1]
entry["count_sectors"] = b[pc + 2]
entry["dst_rel"] = b[pc + 3] | (b[pc + 4] << 8)
entry["flags"] = b[pc + 5]
elif op == 0x18:
off = b[pc + 1] | (b[pc + 2] << 8)
if off >= 0x8000: off -= 0x10000
entry["offset_signed"] = off
elif op == 0x12:
entry["color_code"] = b[pc + 1]
elif op == 0x05: # ADF
entry["freq"] = b[pc + 1] | (b[pc + 2] << 8)
entry["x"] = sint24(b, pc + 3)
entry["y"] = sint24(b, pc + 6)
elif op == 0x1D: # NAV
entry["freq"] = b[pc + 1] | (b[pc + 2] << 8)
entry["x"] = sint24(b, pc + 3)
entry["y"] = sint24(b, pc + 6)
entry["z"] = sint16(b, pc + 9)
elif op == 0x1E: # COM/airport record
length = b[pc + 1]
entry["record_length"] = length
if length >= 13 and pc + length <= len(b):
entry["freq"] = b[pc + 2] | (b[pc + 3] << 8)
entry["x"] = sint24(b, pc + 4)
entry["y"] = sint24(b, pc + 7)
entry["z"] = sint24(b, pc + 10)
# Name occupies bytes 13..length-1 (= length - 13 bytes).
# Per chunk5.s::SceneryOpCOMRecord, cursor advances by
# `length` total, so the name is part of the record proper.
if length > 13:
raw = b[pc + 13:pc + length]
# Names are uppercase ASCII; high bits / control
# bytes are field-separators. Stop at the first
# non-printable to avoid pulling in the next record.
chars = []
for c in raw:
if c < 0x20 or c >= 0x7F:
break
chars.append(chr(c))
if chars:
entry["name"] = "".join(chars)
elif op in (0x00, 0x01, 0x02): # xform-A vertex (X/Y/Z)
entry["vx"] = sint16(b, pc + 1)
entry["vy"] = sint16(b, pc + 3)
entry["vz"] = sint16(b, pc + 5)
elif op in (0x40, 0x41): # xform-B vertex (X/Z)
entry["vx"] = sint16(b, pc + 1)
entry["vz"] = sint16(b, pc + 3)
elif op == 0x42: # cache-fill xform-B
entry["cache_idx"] = b[pc + 1]
entry["vx"] = sint16(b, pc + 2)
entry["vz"] = sint16(b, pc + 4)
elif op == 0x31: # cache-fill xform-A
entry["cache_idx"] = b[pc + 1]
entry["vx"] = sint16(b, pc + 2)
entry["vy"] = sint16(b, pc + 4)
entry["vz"] = sint16(b, pc + 6)
elif op in (0x32, 0x33, 0x35):
entry["cache_idx"] = b[pc + 1]
elif op == 0x2B: # curve
entry["v1x"] = sint16(b, pc + 1)
entry["v1z"] = sint16(b, pc + 3)
entry["v2x"] = sint16(b, pc + 5)
entry["v2z"] = sint16(b, pc + 7)
elif op in (0x13, 0x14, 0x20, 0x21, 0x22, 0x23, 0x28):
jmp = b[pc + 1] | (b[pc + 2] << 8)
if jmp >= 0x8000: jmp -= 0x10000
entry["jump_rel"] = jmp
elif op == 0x0B:
jmp = b[pc + 1] | (b[pc + 2] << 8)
if jmp >= 0x8000: jmp -= 0x10000
entry["jump_rel"] = jmp
elif op == 0x25:
entry["dst"] = b[pc + 1] | (b[pc + 2] << 8)
entry["value"] = b[pc + 3] | (b[pc + 4] << 8)
elif op == 0x1A:
entry["dst"] = b[pc + 1] | (b[pc + 2] << 8)
entry["src"] = b[pc + 3] | (b[pc + 4] << 8)
# -- helpers ---------------------------------------------------------------
def sint16(b, off):
v = b[off] | (b[off + 1] << 8)
if v >= 0x8000: v -= 0x10000
return v
def sint24(b, off):
v = b[off] | (b[off + 1] << 8) | (b[off + 2] << 16)
if v >= 0x800000: v -= 0x1000000
return v
# -- driver ----------------------------------------------------------------
def main():
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("sd_file", help="path to A2.SD<n> or FS2.1 file")
p.add_argument("-o", "--output", default=None,
help="output JSON (default: stdout)")
p.add_argument("--summary", action="store_true",
help="print human summary instead of JSON")
args = p.parse_args()
sd = open(args.sd_file, "rb").read()
if len(sd) != 143360:
print(f"warning: {args.sd_file} is {len(sd)} bytes, expected 143360",
file=sys.stderr)
dec = Decoder(sd)
# Standard walk: dispatcher + everything HEADER-reachable.
dec.decodeSection(DISPATCHER_SID, 1, source="bootstrap")
# Cross-section completeness: also try every sid in 0..$87 as a
# standalone entry point. Sections that the dispatcher loaded already
# are skipped via decodeSection's visited cache. Sections that start
# with non-bytecode (e.g., 6502 code, sparse zeros, $80 filler) are
# filtered out by checking the first byte before deciding to walk.
for sid in range(MAX_SID + 1):
if sid in dec.sections:
continue
off = sidToOffset(sid)
if off is None or off + 1 >= len(sd):
continue
firstByte = sd[off]
# Skip obvious non-bytecode: terminators ($80+), invalid opcodes,
# and the all-zeros sparse pattern.
if firstByte == 0 or firstByte == 0x80 or firstByte == 0xFF:
continue
if firstByte > 0x45 and firstByte < 0x80:
continue
if firstByte not in OPCODES:
continue
# Walk it; size = 1 sector = 256 bytes (we have no count info).
dec.decodeSection(sid, 1, source="orphan")
out = {
"source": os.path.basename(args.sd_file),
"size_bytes": len(sd),
"section_count": len(dec.sections),
"sections": list(dec.sections.values()),
"errors": dec.errors,
}
if args.summary:
printSummary(out)
return
text = json.dumps(out, indent=2)
if args.output:
with open(args.output, "w") as f:
f.write(text)
print(f"wrote {args.output} ({len(text)} bytes)", file=sys.stderr)
else:
print(text)
def printSummary(out):
print(f"file: {out['source']}")
print(f"sections: {out['section_count']}")
print(f"errors: {len(out['errors'])}")
# Coverage rollup
totalSize = 0
totalCov = 0
totalOOR = 0
for sec in out["sections"]:
totalSize += sec.get("size_bytes", 0)
totalCov += sec.get("coverage_bytes", 0)
totalOOR += sec.get("out_of_range_branches", 0)
pct = 100.0 * totalCov / totalSize if totalSize else 0
print(f"coverage: {totalCov}/{totalSize} bytes ({pct:.1f}%) reached by walker")
print(f"out-of-range branches: {totalOOR} (jumps that landed outside section data)")
# Tallies per opcode
op_count = {}
poly_emits = 0
stations = {"ADF": 0, "NAV": 0, "COM": 0}
cull_ops = 0
header_count = 0
for sec in out["sections"]:
for op in sec["ops"]:
op_count[op["name"]] = op_count.get(op["name"], 0) + 1
if op["op"] in (0x00, 0x01, 0x02, 0x40, 0x41,
0x32, 0x33, 0x35, 0x2B):
poly_emits += 1
if op["op"] == 0x05: stations["ADF"] += 1
if op["op"] == 0x1D: stations["NAV"] += 1
if op["op"] == 0x1E: stations["COM"] += 1
if op["op"] in (0x13, 0x14, 0x20, 0x21, 0x22, 0x23, 0x28):
cull_ops += 1
if op["op"] == 0x0D: header_count += 1
print(f"polygon emits: {poly_emits}")
print(f"stations: ADF={stations['ADF']} NAV={stations['NAV']} COM={stations['COM']}")
print(f"culls: {cull_ops}")
print(f"sub-section loads ($0D HEADER): {header_count}")
print()
print("opcode counts (top 20):")
for name, n in sorted(op_count.items(), key=lambda x: -x[1])[:20]:
print(f" {name:32s} {n}")
if __name__ == "__main__":
main()