#!/usr/bin/env python3 """ SceneryExtract -- decode FS2 .SD scenery files into a polygon database. Walks the chunk5 bytecode interpreter logic offline: starts at sid $2C (the dispatcher), follows every $0D HEADER (loads sub-sections), every $18 SUB_INVOKE, and every conditional/unconditional jump. Records: - polygon-emit ops ($00/$01/$02 xform-A, $40/$41 xform-B, $32/$33 cached-vertex, $35 cached-plot, $2B curve) - cache-fill ops ($31, $42) - station records ($05 ADF, $1D NAV, $1E COM) - frame-control ops ($07 EnterLocalFrame, $24 PushOriginWithStash) Output: JSON containing every visited section with its decoded ops, plus a flat list of all polygon vertices (for downstream renderers that don't want to interpret the VM themselves). """ import argparse import json import os import sys # -- file-offset arithmetic ------------------------------------------------ SECTION_BYTES = 256 # one chunk5 "sector" = 256 bytes SECTOR_PER_BLOCK = 2 # 2 sectors per ProDOS 512-byte block MAX_SID = 0x87 # ComputeBlockFromSector rejects sid >= $88 DISPATCHER_SID = 0x2C # bootstrap (LA619) sid; common to all files def sidToOffset(sid): """Return .SD file byte offset for a section id, or None if out of range. Empirically verified: FS2.1's dispatcher (sid $2C) lives at file offset 49152, which matches MAME's captured RAM at $A800. The formula: file_offset = (((sid >> 2) + 1) * 16 + (sid & 3)) * 256 The runtime path (chunk4.s::FetchSectorFromDisk + ReadBlocks via SmartPort) does additional shifts to convert this into a ProDOS block# and looks up the actual disk block via an in-memory table seeded from the ProDOS file index. For an offline extractor operating on the .SD (which is the file content in logical block order), the formula above directly indexes the file. """ if sid < 0 or sid > MAX_SID: return None return (((sid >> 2) + 1) * 16 + (sid & 3)) * SECTION_BYTES # -- opcode metadata ------------------------------------------------------- # Each entry: (mnemonic, fixed-byte-count or None for variable) OPCODES = { 0x00: ("EmitV1XformAndPlot", 7), # xform-A V1 + plot pixel 0x01: ("EmitV1Xform80C5", 7), # xform-A V1 silent 0x02: ("EmitV2Xform80C5", 7), # xform-A V2 + draw 0x03: ("Call64K_2", 6), # 64K callback #2 0x04: ("CullByOutcodeList", None), # variable (terminator-driven) 0x05: ("ADFRecord", 9), # freq + 24-bit XYZ 0x06: ("DrawLine", 5), # 2D screen-coord line 0x07: ("EnterLocalFrame", 14), # set local frame anchor 0x08: ("Invalid08", 1), 0x09: ("Skip3", 3), 0x0A: ("Skip3b", 3), 0x0B: ("JumpRelative", 3), # 16-bit signed offset 0x0C: ("Invalid0C", 1), 0x0D: ("Header", 6), # SECTION-LOAD opcode 0x0E: ("Call64K", 1), # no-op in 48K mode 0x0F: ("Invalid0F", 1), 0x10: ("Invalid10", 1), 0x11: ("Skip1", 1), 0x12: ("SetColor", 2), 0x13: ("JumpIfBeyondXY", 9), # cull XY-plane 0x14: ("JumpIfBeyondXYZ", 11), # cull XYZ 0x15: ("Invalid15", 1), 0x16: ("Invalid16", 1), 0x17: ("Invalid17", 1), 0x18: ("SubInvoke", 3), # JSR-style relative 0x19: ("Return", 1), # RTS 0x1A: ("WriteWord", 5), # *dst = *src 0x1B: ("ModeWhite", 1), 0x1C: ("DayOnly", 1), 0x1D: ("NAVRecord", 11), # freq + XYZ + 16-bit Z 0x1E: ("COMRecord", None), # variable; len in cursor[1] 0x1F: ("Invalid1F", 1), 0x20: ("CullIfOutside1", 9), 0x21: ("CullIfOutside2", 15), 0x22: ("CullIfOutside3", 21), 0x23: ("JumpIfBitsClear", 7), 0x24: ("PushOriginWithStash", 8), 0x25: ("StoreImmWord", 5), 0x26: ("Invalid26", 1), 0x27: ("Invalid27", 1), 0x28: ("JumpIfWordCompare", 9), 0x29: ("CopyToD2", 1), 0x2A: ("Invalid2A", 1), 0x2B: ("EmitCurve", 9), 0x2C: ("Invalid2C", 1), 0x2D: ("Invalid2D", 1), 0x2E: ("Invalid2E", 1), 0x2F: ("ResetState", 1), 0x30: ("Invalid30", 1), 0x31: ("RefreshCachedXform80C5", 8), 0x32: ("VertexCachedV1", 2), 0x33: ("VertexCachedV2", 2), 0x34: ("Invalid34", 1), 0x35: ("VertexCachedDraw", 2), 0x40: ("EmitV1Xform7EBC", 5), 0x41: ("EmitV2Xform7EBC", 5), 0x42: ("RefreshCachedXform7EBC", 6), } # -- bytecode decoder ------------------------------------------------------ # We DON'T evaluate culls (we want to see ALL polygons). We DO evaluate: # - $0D HEADER: load and walk the referenced section # - $18 SUB_INVOKE: walk the referenced sub-record then return # - $19 RTS: terminate current walk # - jumps: take fall-through (don't follow conditional branches) # This biases toward the WORST CASE of geometry the file describes, # which is what we want for an offline asset extraction. class Decoder: def __init__(self, sd_bytes): self.sd = sd_bytes self.sections = {} # sid -> {"raw": bytes, "ops": [...]} self.visitedHeaders = set() self.errors = [] def readSection(self, sid, count_sectors=1): """Return up to count_sectors * 256 bytes starting at sid.""" off = sidToOffset(sid) if off is None: return None end = off + count_sectors * SECTION_BYTES if end > len(self.sd): end = len(self.sd) return self.sd[off:end] def decodeSection(self, sid, count_sectors, depth=0, source=None): """Walk the bytecode for one section, recording ops we encounter.""" key = (sid, count_sectors) if key in self.visitedHeaders: return self.visitedHeaders.add(key) data = self.readSection(sid, count_sectors) if data is None or len(data) == 0: self.errors.append(f"section ${sid:02X} unreadable") return ops = [] secInfo = { "sid": sid, "file_offset": sidToOffset(sid), "count_sectors": count_sectors, "size_bytes": len(data), "source": source, "ops": ops, } self.sections[sid] = secInfo coverage = self._walk(data, sid, ops, depth) secInfo["coverage_bytes"] = coverage["covered"] secInfo["unreached_bytes"] = coverage["unreached"] secInfo["out_of_range_branches"] = coverage["out_of_range"] def _readSignedWord(self, data, off): """Read 16-bit LE signed word, or None if out of range.""" if off + 1 >= len(data): return None v = data[off] | (data[off + 1] << 8) return v - 0x10000 if v >= 0x8000 else v def _opLen(self, data, pc): """Return byte count for opcode at data[pc], or None to halt walk.""" if pc >= len(data): return None op = data[pc] if op & 0x80 or op > 0x45: return None # SceneryStreamEnd (terminator) meta = OPCODES.get(op) if meta is None: return 1 # unknown: skip 1 byte, keep walking _, n = meta if n is not None: return n # Variable length: if op == 0x1E: if pc + 1 >= len(data): return None length = data[pc + 1] return length # COMRecord advances by `length`, NOT length+2 if op == 0x04: # CullByOutcodeList: opcode + 2-byte jump + N vertex-index # bytes + 1-byte terminator (high bit set). Per chunk5.s:2019, # cursor advances by `4 + N` bytes total. n = 3 while pc + n < len(data) and (data[pc + n] & 0x80) == 0: n += 1 return n + 1 # include terminator byte return 1 def _walk(self, data, base_sid, ops, depth): """Walk every reachable byte in this section's bytecode. Each pc gets visited at most once (visited set). For conditional jumps and SubInvoke, both branches are followed (depth-first). For HEADER, the referenced section is loaded recursively. """ visited = set() worklist = [0] outOfRangeBranches = 0 while worklist: pc = worklist.pop() if pc in visited or pc < 0 or pc >= len(data): continue visited.add(pc) op = data[pc] if op & 0x80 or op > 0x45: continue # terminator: end of this branch n = self._opLen(data, pc) if n is None: continue opname = OPCODES.get(op, ("Unknown", 1))[0] entry = { "pc": pc, "op": op, "name": opname, "bytes": list(data[pc:pc + n]), } self._annotate(entry, data, pc, base_sid, depth) ops.append(entry) if op == 0x0D and depth < 6: # Section load: walk the referenced section. sid_load = data[pc + 1] cnt_load = data[pc + 2] self.decodeSection(sid_load, cnt_load, depth + 1, source=f"sid${base_sid:02X}@{pc}") # Continue past the HEADER into the dispatcher. worklist.append(pc + n) elif op == 0x18: # SubInvoke: take BOTH the call target (in-section) AND # the resume point (pc + 3). The call uses a 16-bit # signed offset relative to the opcode address. off = self._readSignedWord(data, pc + 1) if off is not None: target = pc + off if 0 <= target < len(data): worklist.append(target) else: outOfRangeBranches += 1 worklist.append(pc + n) elif op == 0x19: # RTS: end of this sub-record (don't fall through). continue elif op in (0x0B,): # Unconditional relative jump. Follow target only. off = self._readSignedWord(data, pc + 1) if off is not None: target = pc + off if 0 <= target < len(data): worklist.append(target) else: outOfRangeBranches += 1 elif op in (0x13, 0x14, 0x20, 0x21, 0x22, 0x23, 0x28): # Conditional jump: explore BOTH branches so the offline # walker enumerates every reachable polygon regardless of # camera state. off = self._readSignedWord(data, pc + 1) if off is not None: target = pc + off if 0 <= target < len(data): worklist.append(target) else: outOfRangeBranches += 1 worklist.append(pc + n) # fall-through elif op == 0x04: # CullByOutcodeList: jumps to L00A5 (= the address read # by ReadRelativeAddr from cursor+1..+2) on full-cull, # else falls through. Take both. off = self._readSignedWord(data, pc + 1) if off is not None: target = pc + off if 0 <= target < len(data): worklist.append(target) else: outOfRangeBranches += 1 worklist.append(pc + n) else: # Default: continue linearly past this op. worklist.append(pc + n) # Mark all bytes consumed by this op as visited so the # coverage report knows we examined them. for b in range(pc + 1, min(pc + n, len(data))): visited.add(b) # Compute coverage stats. coveredBytes = len(visited) return { "covered": coveredBytes, "unreached": len(data) - coveredBytes, "out_of_range": outOfRangeBranches, } def _annotate(self, entry, data, pc, base_sid, depth): """Decode op-specific fields for nicer JSON. Truncated records (op spilling past section's allocated bytes) skip annotation rather than reading beyond the buffer.""" op = entry["op"] b = data # Use the canonical opcode length, not the (possibly truncated) # captured slice, when deciding bounds. meta = OPCODES.get(op) opLen = meta[1] if meta and meta[1] is not None else len(entry["bytes"]) if pc + opLen > len(data): return if op == 0x0D: entry["sid_load"] = b[pc + 1] entry["count_sectors"] = b[pc + 2] entry["dst_rel"] = b[pc + 3] | (b[pc + 4] << 8) entry["flags"] = b[pc + 5] elif op == 0x18: off = b[pc + 1] | (b[pc + 2] << 8) if off >= 0x8000: off -= 0x10000 entry["offset_signed"] = off elif op == 0x12: entry["color_code"] = b[pc + 1] elif op == 0x05: # ADF entry["freq"] = b[pc + 1] | (b[pc + 2] << 8) entry["x"] = sint24(b, pc + 3) entry["y"] = sint24(b, pc + 6) elif op == 0x1D: # NAV entry["freq"] = b[pc + 1] | (b[pc + 2] << 8) entry["x"] = sint24(b, pc + 3) entry["y"] = sint24(b, pc + 6) entry["z"] = sint16(b, pc + 9) elif op == 0x1E: # COM/airport record length = b[pc + 1] entry["record_length"] = length if length >= 13 and pc + length <= len(b): entry["freq"] = b[pc + 2] | (b[pc + 3] << 8) entry["x"] = sint24(b, pc + 4) entry["y"] = sint24(b, pc + 7) entry["z"] = sint24(b, pc + 10) # Name occupies bytes 13..length-1 (= length - 13 bytes). # Per chunk5.s::SceneryOpCOMRecord, cursor advances by # `length` total, so the name is part of the record proper. if length > 13: raw = b[pc + 13:pc + length] # Names are uppercase ASCII; high bits / control # bytes are field-separators. Stop at the first # non-printable to avoid pulling in the next record. chars = [] for c in raw: if c < 0x20 or c >= 0x7F: break chars.append(chr(c)) if chars: entry["name"] = "".join(chars) elif op in (0x00, 0x01, 0x02): # xform-A vertex (X/Y/Z) entry["vx"] = sint16(b, pc + 1) entry["vy"] = sint16(b, pc + 3) entry["vz"] = sint16(b, pc + 5) elif op in (0x40, 0x41): # xform-B vertex (X/Z) entry["vx"] = sint16(b, pc + 1) entry["vz"] = sint16(b, pc + 3) elif op == 0x42: # cache-fill xform-B entry["cache_idx"] = b[pc + 1] entry["vx"] = sint16(b, pc + 2) entry["vz"] = sint16(b, pc + 4) elif op == 0x31: # cache-fill xform-A entry["cache_idx"] = b[pc + 1] entry["vx"] = sint16(b, pc + 2) entry["vy"] = sint16(b, pc + 4) entry["vz"] = sint16(b, pc + 6) elif op in (0x32, 0x33, 0x35): entry["cache_idx"] = b[pc + 1] elif op == 0x2B: # curve entry["v1x"] = sint16(b, pc + 1) entry["v1z"] = sint16(b, pc + 3) entry["v2x"] = sint16(b, pc + 5) entry["v2z"] = sint16(b, pc + 7) elif op in (0x13, 0x14, 0x20, 0x21, 0x22, 0x23, 0x28): jmp = b[pc + 1] | (b[pc + 2] << 8) if jmp >= 0x8000: jmp -= 0x10000 entry["jump_rel"] = jmp elif op == 0x0B: jmp = b[pc + 1] | (b[pc + 2] << 8) if jmp >= 0x8000: jmp -= 0x10000 entry["jump_rel"] = jmp elif op == 0x25: entry["dst"] = b[pc + 1] | (b[pc + 2] << 8) entry["value"] = b[pc + 3] | (b[pc + 4] << 8) elif op == 0x1A: entry["dst"] = b[pc + 1] | (b[pc + 2] << 8) entry["src"] = b[pc + 3] | (b[pc + 4] << 8) # -- helpers --------------------------------------------------------------- def sint16(b, off): v = b[off] | (b[off + 1] << 8) if v >= 0x8000: v -= 0x10000 return v def sint24(b, off): v = b[off] | (b[off + 1] << 8) | (b[off + 2] << 16) if v >= 0x800000: v -= 0x1000000 return v # -- driver ---------------------------------------------------------------- def main(): p = argparse.ArgumentParser(description=__doc__) p.add_argument("sd_file", help="path to A2.SD or FS2.1 file") p.add_argument("-o", "--output", default=None, help="output JSON (default: stdout)") p.add_argument("--summary", action="store_true", help="print human summary instead of JSON") args = p.parse_args() sd = open(args.sd_file, "rb").read() if len(sd) != 143360: print(f"warning: {args.sd_file} is {len(sd)} bytes, expected 143360", file=sys.stderr) dec = Decoder(sd) # Standard walk: dispatcher + everything HEADER-reachable. dec.decodeSection(DISPATCHER_SID, 1, source="bootstrap") # Cross-section completeness: also try every sid in 0..$87 as a # standalone entry point. Sections that the dispatcher loaded already # are skipped via decodeSection's visited cache. Sections that start # with non-bytecode (e.g., 6502 code, sparse zeros, $80 filler) are # filtered out by checking the first byte before deciding to walk. for sid in range(MAX_SID + 1): if sid in dec.sections: continue off = sidToOffset(sid) if off is None or off + 1 >= len(sd): continue firstByte = sd[off] # Skip obvious non-bytecode: terminators ($80+), invalid opcodes, # and the all-zeros sparse pattern. if firstByte == 0 or firstByte == 0x80 or firstByte == 0xFF: continue if firstByte > 0x45 and firstByte < 0x80: continue if firstByte not in OPCODES: continue # Walk it; size = 1 sector = 256 bytes (we have no count info). dec.decodeSection(sid, 1, source="orphan") out = { "source": os.path.basename(args.sd_file), "size_bytes": len(sd), "section_count": len(dec.sections), "sections": list(dec.sections.values()), "errors": dec.errors, } if args.summary: printSummary(out) return text = json.dumps(out, indent=2) if args.output: with open(args.output, "w") as f: f.write(text) print(f"wrote {args.output} ({len(text)} bytes)", file=sys.stderr) else: print(text) def printSummary(out): print(f"file: {out['source']}") print(f"sections: {out['section_count']}") print(f"errors: {len(out['errors'])}") # Coverage rollup totalSize = 0 totalCov = 0 totalOOR = 0 for sec in out["sections"]: totalSize += sec.get("size_bytes", 0) totalCov += sec.get("coverage_bytes", 0) totalOOR += sec.get("out_of_range_branches", 0) pct = 100.0 * totalCov / totalSize if totalSize else 0 print(f"coverage: {totalCov}/{totalSize} bytes ({pct:.1f}%) reached by walker") print(f"out-of-range branches: {totalOOR} (jumps that landed outside section data)") # Tallies per opcode op_count = {} poly_emits = 0 stations = {"ADF": 0, "NAV": 0, "COM": 0} cull_ops = 0 header_count = 0 for sec in out["sections"]: for op in sec["ops"]: op_count[op["name"]] = op_count.get(op["name"], 0) + 1 if op["op"] in (0x00, 0x01, 0x02, 0x40, 0x41, 0x32, 0x33, 0x35, 0x2B): poly_emits += 1 if op["op"] == 0x05: stations["ADF"] += 1 if op["op"] == 0x1D: stations["NAV"] += 1 if op["op"] == 0x1E: stations["COM"] += 1 if op["op"] in (0x13, 0x14, 0x20, 0x21, 0x22, 0x23, 0x28): cull_ops += 1 if op["op"] == 0x0D: header_count += 1 print(f"polygon emits: {poly_emits}") print(f"stations: ADF={stations['ADF']} NAV={stations['NAV']} COM={stations['COM']}") print(f"culls: {cull_ops}") print(f"sub-section loads ($0D HEADER): {header_count}") print() print("opcode counts (top 20):") for name, n in sorted(op_count.items(), key=lambda x: -x[1])[:20]: print(f" {name:32s} {n}") if __name__ == "__main__": main()