#!/usr/bin/python3 # # 86Box A hypervisor and IBM PC system emulator that specializes in # running old operating systems and software designed for IBM # PC systems and compatibles from 1981 through fairly recent # system designs based on the PCI bus. # # This file is part of the 86Box BIOS Tools distribution. # # BIOS and archive extraction classes. # # # # Authors: RichardG, # # Copyright 2021 RichardG. # import array, codecs, datetime, io, itertools, math, os, re, shutil, socket, struct, subprocess, sys, time try: import PIL.Image except ImportError: PIL = lambda x: x PIL.Image = None from . import util class MultifileStaleException(Exception): """Exception raised by Extractor.multifile_lock_acquire() if the file has gone missing after the multi-file lock was acquired.""" pass class Extractor: def __init__(self): self.debug = True self.multifile_locked = False def extract(self, file_path, file_header, dest_dir, dest_dir_0): """Extract the given file into one of the destination directories: dest_dir allows extracted files to be reprocessed in the next run, while dest_dir_0 does not. This must return either: - False if this extractor can't handle the given file - True if this extractor can handle the given file, but no output was produced - a string with the produced output file/directory path""" raise NotImplementedError() def debug_print(self, *args): """Print a log line if debug output is enabled.""" print(self.__class__.__name__ + ':', *args, file=sys.stderr) def multifile_lock_acquire(self, file_path): """Acquire the global multi-file lock. The lock is automatically released by the main module after extract() returns or raises an exception.""" self.multifile_lock.acquire() self.multifile_locked = True # Raise the special exception if another extractor already processed this file. try: return os.path.getsize(file_path) except: raise MultifileStaleException() class ApricotExtractor(Extractor): """Extract Apricot BIOS recovery files. Only one instance of this format (Trimond Trent) has been observed, let us know if you find any other!""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Apricot version signature. self._apricot_pattern = re.compile(b'''@\\(#\\)Apricot ''') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if this isn't a slightly-bigger-than-power-of-two file. # The only observed file has a 2071-byte header. try: file_size = os.path.getsize(file_path) except: return False if file_size < 4096: return False pow2 = 1 << math.floor(math.log2(file_size)) if file_size <= pow2 or file_size > pow2 + 4096: return False # Look for the Apricot signature as a safety net. if not self._apricot_pattern.search(file_header): return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Separate payload and header. try: # Open Apricot file. in_f = open(file_path, 'rb') # Read header. header = in_f.read(file_size - pow2) # Copy payload. try: out_f = open(os.path.join(dest_dir, 'apricot.bin'), 'wb') data = b' ' while data: data = in_f.read(1048576) out_f.write(data) out_f.close() except: in_f.close() return True # Write header. try: out_f = open(os.path.join(dest_dir, ':header:'), 'wb') out_f.write(header) out_f.close() except: pass # Remove Apricot file. in_f.close() os.remove(file_path) except: pass # Return destination directory path. return dest_dir class ArchiveExtractor(Extractor): """Extract known archive types.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Known signatures for archive files. self._signature_pattern = re.compile( b'''PK\\x03\\x04|''' # zip b'''Rar!\\x1A\\x07|''' # rar b'''7z\\xBC\\xAF\\x27\\x1C|''' # 7z b'''MSCF|''' # cab b'''\\x1F\\x8B|''' # gzip b'''BZh|''' # bzip2 b'''\\xFD7zXZ\\x00|''' # xz b'''LHA\\x20|''' # lha b'''ZOO''' # zoo ) # /dev/null handle for suppressing output. self._devnull = open(os.devnull, 'wb') # 7-Zip has this annoying quirk where it scans the archive's parent # directory structure before extracting the archive itself. This # takes a very long time if any of the parent directories has a lot # of files. Therefore, we try to find a location as close to / as # possible, so we can symlink the archive there and make that parent # scan as quick as possible. Igor recognizes this is an inefficiency # in p7zip, but even the native Linux 7-Zip 21.07 still has it...? dirs = [] my_file_path = os.path.abspath(__file__) for dir_path in (os.path.dirname(my_file_path), os.getcwd(), '/tmp', '/run/user/' + str(hasattr(os, 'getuid') and os.getuid() or 0)): # Get file count for all levels of the path. levels = [] while True: try: list_len = len(os.listdir(dir_path)) except: list_len = 2 ** 32 levels.append((dir_path, list_len)) parent_dir_path = os.path.dirname(dir_path) if parent_dir_path == dir_path: break dir_path = parent_dir_path # Go through levels in ascending (therefore closest to /) order. levels.sort() total_count = 0 for level_dir, level_count in levels: total_count += level_count dirs.append((level_dir, total_count)) # Remove duplicates and sort by total children count. dirs = list(set(dirs)) dirs.sort(key=lambda x: (x[1], x[0])) # See where we can create a symlink. temp_file_name = 'biostools_{0}_{1}_{2}'.format(socket.gethostname(), hex(os.getpid())[2:], hex(id(self))[2:]) self._temp_paths = [] for dir_path, dir_children in dirs: # Test symlink creation. link_path = os.path.join(dir_path, temp_file_name) try: # Create symlink and check if it was actually created. os.symlink(my_file_path, link_path) if os.readlink(link_path) == my_file_path: # Test passed, add to temporary path list. self._temp_paths.append(link_path) except: pass # Remove any created symlink. try: os.remove(link_path) except: pass def extract(self, file_path, file_header, dest_dir, dest_dir_0): """Extract an archive.""" # Stop if this is apparently not an archive. match = self._signature_pattern.match(file_header) if not match: return False # Do the actual extraction. return self._extract_archive(file_path, dest_dir) def _extract_archive(self, file_path, dest_dir, remove=True): # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Try creating temporary symlink with the archive's extension. file_path_abs = os.path.abspath(file_path) _, ext = os.path.splitext(file_path_abs) link_path = file_path_abs for temp_path in self._temp_paths: temp_path_ext = temp_path + ext try: # Create symlink and check if it was actually created. os.symlink(file_path_abs, temp_path_ext) if os.readlink(temp_path_ext) == file_path_abs: # Test passed, make this link the new path. link_path = temp_path_ext break else: # Remove link if it was created. os.remove(temp_path_ext) except: pass # Run 7z command to extract the archive. # The dummy password prevents any password prompts from stalling 7z. subprocess.run(['7z', 'x', '-y', '-aou', '-ppassword', '--', link_path], stdout=self._devnull, stderr=subprocess.STDOUT, cwd=dest_dir) # Remove temporary symlink. if link_path != file_path_abs: while os.path.islink(link_path): try: os.remove(link_path) except: break # Assume failure if nothing was extracted. files_extracted = os.listdir(dest_dir) if len(files_extracted) < 1: self.debug_print('Extraction produced no files:', file_path) return False # Rename single file. (gzip/bzip2/etc.) if len(files_extracted) == 1 and link_path != file_path_abs: link_name = os.path.splitext(os.path.basename(link_path))[0] if files_extracted[0][:len(link_name)] == link_name: try: shutil.move(os.path.join(dest_dir, files_extracted[0]), os.path.join(dest_dir, os.path.splitext(os.path.basename(file_path))[0] + files_extracted[0][len(link_name):])) except: pass # Remove archive file. if remove: try: os.remove(file_path) except: pass # Return destination directory path. return dest_dir class ASTExtractor(Extractor): """Extract AST BIOS flash floppy images. These appear to contain a specially crafted FAT filesystem, likely with static sector offsets for the payload, so we work on the entire image before FATExtractor has a chance to claim it.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # AST flash signature. self._ast_start_pattern = re.compile(b'''This is a flash update from AST Research, Inc\\.''') self._ast_payload_pattern = re.compile(b'''AST FLASH UPDATE''') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if this file is too small. if os.path.getsize(file_path) <= 0x9083: return False # Look for the AST signatures. if not self._ast_start_pattern.match(file_header[0x4200:0x422e]): return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Open AST image. with open(file_path, 'rb') as in_f: # Skip the initial 72 sectors. in_f.seek(0x9000) # Copy payload. header = b'' dest_file_path = os.path.join(dest_dir, 'ast.bin') try: with open(dest_file_path, 'wb') as out_f: data = remaining = True while data and remaining > 0: payload_size = 15 * 512 if data == True: # Check the header on the first payload sector. header += in_f.read(0x83) if not self._ast_payload_pattern.match(header[:0x10]): raise Exception('missing header') # Subtract header from payload. remaining, = struct.unpack('\\|]''' self._phoenixnet_workaround_pattern = re.compile( fn + b'''(?:\\x00{7}|''' + fn + b'''(?:\\x00{6}|''' + fn + b'''(?:\\x00{5}|''' + fn + b'''(?:\\x00{4}|''' + fn + b'''(?:\\x00{3}|''' + fn + b'''(?:\\x00{2}|''' + fn + b'''(?:\\x00{1}|''' + fn + b''')))))))''' + fn + b'''(?:\\x00{2}|''' + fn + b'''(?:\\x00{1}|''' + fn + b'''))''' ) # Path to the bios_extract utility. self._bios_extract_path = os.path.abspath(os.path.join('bios_extract', 'bios_extract')) if not os.path.exists(self._bios_extract_path): self._bios_extract_path = None def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if bios_extract is not available. if not self._bios_extract_path: return False # Read up to 16 MB as a safety net. file_header += util.read_complement(file_path, file_header) # Stop if no BIOS signatures are found. if not self._entrypoint_pattern.match(file_header[-16:]) and not self._signature_pattern.search(file_header): return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir_0): return True # Start bios_extract process. file_path_abs = os.path.abspath(file_path) try: proc = subprocess.run([self._bios_extract_path, file_path_abs], timeout=30, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dest_dir_0) except: # Bad data can cause infinite loops. proc = None self.debug_print('Processing timed out on:', file_path) if proc and proc.returncode not in (0, 1, 86): self.debug_print('Bad return code:', proc.returncode) # Assume failure if nothing was extracted. A lone remainder file also counts as a failure. dest_dir_files = os.listdir(dest_dir_0) num_files_extracted = len(dest_dir_files) if num_files_extracted < 1: self.debug_print('Extraction produced no files:', file_path) return False elif num_files_extracted == 1 and dest_dir_files[0] == 'remainder.rom': # Remove remainder file so that the destination directory can be rmdir'd later. self.debug_print('Extraction only produced remainder file:', file_path) util.remove_all(dest_dir_files, lambda x: os.path.join(dest_dir_0, x)) return False elif proc and proc.returncode == 86: # We received the magic exit code that tells us the Intel pipeline found # an option ROM but not the main body. This could indicate a non-Intel # BIOS with LH5-compressed option ROMs. Check the files just in case. have_intelopt = have_intelbody = False for dest_dir_file in dest_dir_files: if dest_dir_file[:9] in ('intelopt_', 'intelunk_'): have_intelopt = True elif dest_dir_file[:10] == 'intelbody_': have_intelbody = True break if have_intelopt and not have_intelbody: # Remove all files so that the destination directory can be rmdir'd later. self.debug_print('Extraction produced Intel option ROM without main body:', file_path) util.remove_all(dest_dir_files, lambda x: os.path.join(dest_dir_0, x)) return False # A missing remainder.rom may indicate an extraction interrupted by a segfault # or something else gone wrong. Copy the original file to its place for safety. if 'remainder.rom' not in dest_dir_files: self.debug_print('Creating remainder stand-in for:', file_path) util.hardlink_or_copy(file_path, os.path.join(dest_dir_0, 'remainder.rom')) # Remove extraneous files containing Intel body remains. (Batman's Revenge 04/15/1994) if not proc or b'intelbody_' in proc.stdout: intel_bodies = [dest_dir_file for dest_dir_file in dest_dir_files if dest_dir_file[:10] == 'intelbody_'] if len(intel_bodies) > 1: # Get size for all body files. for x in range(len(intel_bodies)): try: body_size = os.path.getsize(os.path.join(dest_dir_0, intel_bodies[x])) except: body_size = 0 intel_bodies[x] = (body_size, intel_bodies[x]) # Remove all but the largest body file. intel_bodies.sort(reverse=True) self.debug_print('Keeping Intel body file', intel_bodies[0], 'and discarding', intel_bodies[1:]) util.remove_all(intel_bodies[1:], lambda x: os.path.join(dest_dir_0, x[1])) # Remove removed files from file list. for _, body_name in intel_bodies[1:]: dest_dir_files.remove(body_name) # Extract Award BIOS PhoenixNet ROS filesystem. if not proc or b'Found Award BIOS.' in proc.stdout: for dest_dir_file in dest_dir_files: # Read and check for ROS header. dest_dir_file_path = os.path.join(dest_dir_0, dest_dir_file) in_f = open(dest_dir_file_path, 'rb') dest_dir_file_header = in_f.read(3) if dest_dir_file_header == b'ROS': self.debug_print('Extracting PhoenixNet ROS:', dest_dir_file) # Create new destination directory for the expanded ROS. dest_dir_ros = os.path.join(dest_dir_0, dest_dir_file + ':') if util.try_makedirs(dest_dir_ros): # Skip initial header. in_f.seek(32) # Parse file entries. while True: # Read file entry header. header = in_f.read(32) if len(header) != 32: break file_size, = struct.unpack(' 1: self.debug_print('ROS file:', file_name) out_f = open(os.path.join(dest_dir_ros, file_name), 'wb') out_f.write(data) out_f.close() # Run image converter on the desstination directory. self.image_extractor.convert_inline(os.listdir(dest_dir_ros), dest_dir_ros) # Don't remove ROS as the analyzer uses it for PhoenixNet detection. # Just remove the destination directory if it's empty. util.rmdirs(dest_dir_ros) in_f.close() # Convert any BIOS logo images in-line (to the same destination directory). self.image_extractor.convert_inline(dest_dir_files, dest_dir_0) # Create flag file on the destination directory for the analyzer to # treat it as a big chunk of data. open(os.path.join(dest_dir_0, ':combined:'), 'wb').close() # Hardlink or copy any header file to extracted directory, to help with # identifying Intel BIOSes. See AMIAnalyzer.can_handle for more information. parent_header = os.path.join(os.path.dirname(file_path_abs), ':header:') if os.path.exists(parent_header): util.hardlink_or_copy(parent_header, os.path.join(dest_dir_0, ':header:')) # Remove BIOS file. try: os.remove(file_path) except: pass # Return destination directory path. return dest_dir_0 class CPUZExtractor(Extractor): """Extract CPU-Z BIOS dump reports.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Patterns for parsing a report hex dump. self._cpuz_pattern = re.compile(b'''CPU-Z version\\t+([^\\r\\n]+)''') self._hex_pattern = re.compile(b'''[0-9A-F]+\\t((?:[0-9A-F]{2} ){16})\\t''') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if this is not a CPU-Z dump. cpuz_match = self._cpuz_pattern.search(file_header) if not cpuz_match: return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Read up to 16 MB as a safety net. file_header += util.read_complement(file_path, file_header) # Convert hex back to binary. try: f = open(os.path.join(dest_dir, 'cpuz.bin'), 'wb') for match in self._hex_pattern.finditer(file_header): f.write(codecs.decode(match.group(1).replace(b' ', b''), 'hex')) f.close() except: return True # Create header file with the CPU-Z version string. try: f = open(os.path.join(dest_dir, ':header:'), 'wb') f.write(cpuz_match.group(1)) f.close() except: pass # Remove report file. try: os.remove(file_path) except: pass # Return destination directory path. return dest_dir class DellExtractor(Extractor): """Extract Dell/Phoenix ROM BIOS PLUS images. Based on dell_inspiron_1100_unpacker.py""" def _memcpy(self, arr1, off1, arr2, off2, count): while count: if off1 < len(arr1): try: arr1[off1] = arr2[off2] except: break elif off1 >= len(arr1): while off1 >= len(arr1): arr1.append(0xFF) continue else: break off1 += 1 off2 += 1 count -= 1 def _dell_unpack(self, indata): srcoff = 0 dstoff = 0 src = bytearray(indata) dst = bytearray() inlen = len(indata) while srcoff < inlen: b = src[srcoff] nibl, nibh = b & 0x0F, (b >> 4) & 0x0F srcoff += 1 if nibl: if nibl == 0xF: al = src[srcoff] ah = src[srcoff+1] srcoff += 2 cx = nibh | (ah << 4) count = (cx & 0x3F) + 2 delta = ((ah >> 2) << 8) | al else: count = nibl + 1 delta = (nibh << 8) | src[srcoff] srcoff += 1 self._memcpy(dst, dstoff, dst, dstoff - delta - 1, count) dstoff += count elif nibh == 0x0E: count = src[srcoff] + 1 srcoff += 1 self._memcpy(dst, dstoff, dst, dstoff - 1, count) dstoff += count else: if nibh == 0x0F: count = src[srcoff] + 15 srcoff += 1 else: count = nibh + 1 self._memcpy(dst, dstoff, src, srcoff, count) dstoff += count srcoff += count return dst def _dell_unpack_alt(self, indata): srcoff = 0 dstoff = 0 src = bytearray(indata) dst = bytearray() inlen = len(indata) while srcoff < inlen: b = src[srcoff] nibl, nibh = b & 0x0F, (b >> 4) & 0x0F srcoff += 1 if nibl: count = nibl + 1 delta = (nibh << 8) | src[srcoff] srcoff += 1 self._memcpy(dst, dstoff, dst, dstoff - delta - 1, count) dstoff += count else: count = nibh + 1 self._memcpy(dst, dstoff, src, srcoff, count) dstoff += count srcoff += count return dst def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Read up to 16 MB as a safety net. file_header += util.read_complement(file_path, file_header) # Stop if this is not the type of BIOS we're looking for. copyright_string = b'\xF0\x00Copyright 1985-\x02\x04\xF0\x0F8 Phoenix Technologies Ltd.' alt_mode = False offset = file_header.find(copyright_string) if offset < 5: alt_mode = True copyright_string = b'\xE0Copyright 1985-\x02\x04\xF08 Phoenix Techno\xD0logies Ltd.' offset = file_header.find(copyright_string) if offset < 2: copyright_string = b'Copyright 1985-1988 Phoenix Technologies Ltd.' offset = file_header.find(copyright_string) if offset > 2 and (offset & 0xffff) == 0 and file_header[2] == 0xf0: # partial compression (OptiPlex 5xx) offset = 2 else: return False # Determine the length format. if alt_mode: # 16-bit length (no module type prefix) with different compression. length_size = 2 struct_format = '<0sH' elif file_header[offset - 5] == 1: # 32-bit length. length_size = 5 struct_format = ' 0: self.debug_print('Extracting', offset, 'bytes of EC code') f = open(os.path.join(dest_dir_0, 'ec.bin'), 'wb') f.write(file_header[:offset]) f.close() # Extract modules. file_size = len(file_header) module_number = 0 while (offset + length_size) < file_size: # Read module type and length. module_type, module_length = struct.unpack(struct_format, file_header[offset:offset + length_size]) if (alt_mode and module_length in (0, 0xFFFF)) or module_type == 0xFF: break self.debug_print('Extracting module number', module_number, 'type', module_type, 'size', module_length) offset += length_size # Decompress data if required. data = file_header[offset:offset + module_length] if module_type != 0x0C: try: data = (alt_mode and self._dell_unpack_alt or self._dell_unpack)(data) if len(data) == 0: self.debug_print('Extraction produced blank output') except: self.debug_print('Extraction failed') offset += module_length # Write module. f = open(os.path.join(dest_dir_0, 'module_{0:02}.bin'.format(module_number)), 'wb') f.write(data) f.close() # Increase filename counter. module_number += 1 # Extract remainder if applicable. if offset < file_size: try: f = open(os.path.join(dest_dir_0, 'remainder.bin'), 'wb') f.write(file_header[offset:]) f.close() except: pass # Create header file with the copyright string, to tell the analyzer # this BIOS went through this extractor. try: f = open(os.path.join(dest_dir_0, ':header:'), 'wb') f.write(copyright_string) f.close() except: pass # Remove BIOS file. try: os.remove(file_path) except: pass # Return destination directory path. return dest_dir_0 class DiscardExtractor(Extractor): """Detect and discard known non-useful file types.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # File signatures to discard. self._signature_pattern = re.compile( # images b'''\\x0A[\\x00-\\x05][\\x00-\\x01][\\x01\\x02\\x04\\x08]|''' # PCX b'''BM|''' # BMP b'''\\xFF\\xD8\\xFF|''' # JPEG b'''GIF8|''' # GIF b'''\\x89PNG|''' # PNG # documents b'''%PDF|''' # PDF b'''\\xD0\\xCF\\x11\\xE0\\xA1\\xB1\\x1A\\xE1|''' # Office (mszip) b'''\\x3F\\x5F\\x03\\x00|''' # WinHelp b'''<(?:\\![Dd][Oo][Cc][Tt][Yy][Pp][Ee]|[Hh][Tt][Mm][Ll])[ >]|''' # HTML (a cursory check ought not to upset anyone) # executables b'''(\\x7FELF)|''' # ELF # reports b'''CPU-Z TXT Report|\\s{7}File: A|-+\\[ AIDA32 |HWiNFO64 Version |3DMARK2001 PROJECT|Report Dr. Hardware|''' b'''\\r\\n(?:\\s+HWiNFO v|\\r\\n\\s+\\r\\n\\s+Microsoft Diagnostics version )|''' b'''SIV[^\\s]+ - System Information Viewer V|UID,Name,Score,''' ) def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Determine if this is a known non-useful file type through the signature pattern. match = self._signature_pattern.match(file_header) if match: # Don't discard LinuxBIOS ELFs. if match.group(1) and file_header[128:136] == b'ELFBoot\x00': return False # Remove file and stop. try: os.remove(file_path) except: pass return True # Not a known file type, cleared to go. return False class ImageExtractor(Extractor): """Extract BIOS logo images by converting them into PNG.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Standard EGA/VGA palette for v1 and palette-less v2 Award EPAs. self._vga_palette = [ 0x000000, 0x0000aa, 0x00aa00, 0x00aaaa, 0xaa0000, 0xaa00aa, 0xaa5500, 0xaaaaaa, 0x555555, 0x5555ff, 0x55ff55, 0x55ffff, 0xff5555, 0xff55ff, 0xffff55, 0xffffff, 0x000000, 0x101010, 0x202020, 0x353535, 0x454545, 0x555555, 0x656565, 0x757575, 0x8a8a8a, 0x9a9a9a, 0xaaaaaa, 0xbababa, 0xcacaca, 0xdfdfdf, 0xefefef, 0xffffff, 0x0000ff, 0x4100ff, 0x8200ff, 0xbe00ff, 0xff00ff, 0xff00be, 0xff0082, 0xff0041, 0xff0000, 0xff4100, 0xff8200, 0xffbe00, 0xffff00, 0xbeff00, 0x82ff00, 0x41ff00, 0x00ff00, 0x00ff41, 0x00ff82, 0x00ffbe, 0x00ffff, 0x00beff, 0x0082ff, 0x0041ff, 0x8282ff, 0x9e82ff, 0xbe82ff, 0xdf82ff, 0xff82ff, 0xff82df, 0xff82be, 0xff829e, 0xff8282, 0xff9e82, 0xffbe82, 0xffdf82, 0xffff82, 0xdfff82, 0xbeff82, 0x9eff82, 0x82ff82, 0x82ff9e, 0x82ffbe, 0x82ffdf, 0x82ffff, 0x82dfff, 0x82beff, 0x829eff, 0xbabaff, 0xcabaff, 0xdfbaff, 0xefbaff, 0xffbaff, 0xffbaef, 0xffbadf, 0xffbaca, 0xffbaba, 0xffcaba, 0xffdfba, 0xffefba, 0xffffba, 0xefffba, 0xdfffba, 0xcaffba, 0xbaffba, 0xbaffca, 0xbaffdf, 0xbaffef, 0xbaffff, 0xbaefff, 0xbadfff, 0xbacaff, 0x000071, 0x1c0071, 0x390071, 0x550071, 0x710071, 0x710055, 0x710039, 0x71001c, 0x710000, 0x711c00, 0x713900, 0x715500, 0x717100, 0x557100, 0x397100, 0x1c7100, 0x007100, 0x00711c, 0x007139, 0x007155, 0x007171, 0x005571, 0x003971, 0x001c71, 0x393971, 0x453971, 0x553971, 0x613971, 0x713971, 0x713961, 0x713955, 0x713945, 0x713939, 0x714539, 0x715539, 0x716139, 0x717139, 0x617139, 0x557139, 0x457139, 0x397139, 0x397145, 0x397155, 0x397161, 0x397171, 0x396171, 0x395571, 0x394571, 0x515171, 0x595171, 0x615171, 0x695171, 0x715171, 0x715169, 0x715161, 0x715159, 0x715151, 0x715951, 0x716151, 0x716951, 0x717151, 0x697151, 0x617151, 0x597151, 0x517151, 0x517159, 0x517161, 0x517169, 0x517171, 0x516971, 0x516171, 0x515971, 0x000041, 0x100041, 0x200041, 0x310041, 0x410041, 0x410031, 0x410020, 0x410010, 0x410000, 0x411000, 0x412000, 0x413100, 0x414100, 0x314100, 0x204100, 0x104100, 0x004100, 0x004110, 0x004120, 0x004131, 0x004141, 0x003141, 0x002041, 0x001041, 0x202041, 0x282041, 0x312041, 0x392041, 0x412041, 0x412039, 0x412031, 0x412028, 0x412020, 0x412820, 0x413120, 0x413920, 0x414120, 0x394120, 0x314120, 0x284120, 0x204120, 0x204128, 0x204131, 0x204139, 0x204141, 0x203941, 0x203141, 0x202841, 0x2d2d41, 0x312d41, 0x352d41, 0x3d2d41, 0x412d41, 0x412d3d, 0x412d35, 0x412d31, 0x412d2d, 0x41312d, 0x41352d, 0x413d2d, 0x41412d, 0x3d412d, 0x35412d, 0x31412d, 0x2d412d, 0x2d4131, 0x2d4135, 0x2d413d, 0x2d4141, 0x2d3d41, 0x2d3541, 0x2d3141, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000, 0x000000 ] # Header pattern for common format images. self._pil_pattern = re.compile( b'''\\x0A[\\x00-\\x05][\\x00-\\x01][\\x01\\x02\\x04\\x08]|''' # PCX b'''BM|''' # BMP b'''\\xFF\\xD8\\xFF|''' # JPEG b'''GIF8|''' # GIF b'''\\x89PNG''' # PNG ) def convert_inline(self, dest_dir_files, dest_dir_0): # Detect and convert image files. for dest_dir_file in dest_dir_files: # Read 64 KB, which is enough to ascertain any potential logo type, # even if embedded in the file. (Monorail SiS 550x: PCX in AMI module) dest_dir_file_path = os.path.join(dest_dir_0, dest_dir_file) if os.path.isdir(dest_dir_file_path): continue f = open(dest_dir_file_path, 'rb') dest_dir_file_header = f.read(65536) f.close() # Run ImageExtractor. image_dest_dir = dest_dir_file_path + ':' if self.extract(dest_dir_file_path, dest_dir_file_header, image_dest_dir, image_dest_dir, any_offset=True): # Remove destination directory if it was created but is empty. util.rmdirs(image_dest_dir) def extract(self, file_path, file_header, dest_dir, dest_dir_0, any_offset=False): # Stop if PIL is not available or this file is too small. if not PIL.Image or len(file_header) < 16: return False # Determine if this is an image, and which type it is. func = None image_data_offset = 0 if file_header[:4] == b'AWBM': # Get width and height for a v2 EPA. width, height = struct.unpack('= 8 + (width * height): func = self._convert_epav2_8b else: func = self._convert_epav2_4b elif file_header[:2] == b'PG': # Get width and height for a Phoenix Graphics image. width, height = struct.unpack(' 18 + payload_size: palette_size, = struct.unpack('= 20 + (4 * palette_size) + payload_size: # Special marker that the palette should be read. width = -width print('pgx', width, height) if width != 0 and height != 0: func = self._convert_pgx if not func: # Determine if this file is the right size for a v1 EPA. width, height = struct.unpack('BB', file_header[:2]) if os.path.getsize(file_path) == 72 + (15 * width * height): func = self._convert_epav1 else: # Determine if this is a common image format. match = (any_offset and self._pil_pattern.search or self._pil_pattern.match)(file_header) if match: func = self._convert_pil image_data_offset = match.start(0) self.debug_print('PIL signature', match.group(0), 'on', file_path, '@', hex(image_data_offset)) else: # Stop if this is not an image. return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir_0): return True # Read up to 16 MB (+ data offset) as a safety net. max_size = 16777216 + image_data_offset file_header += util.read_complement(file_path, file_header, max_size) # Stop if the file was cut off, preventing parsing exceptions. if len(file_header) == max_size: return True # Run extractor function, and stop if it was not successful. self.debug_print('Calling', func.__name__, 'on', file_path) ret = func(file_header, image_data_offset, width, height, dest_dir_0) if not ret: return True # Remove original file if it's the entire image (with a maximum margin of 1 byte). if ret == True or len(file_header) - ret <= 1: try: os.remove(file_path) except: pass return dest_dir_0 def _convert_epav1(self, file_data, image_data_offset, width, height, dest_dir_0): # Write file type as a header. self._write_type(dest_dir_0, 'EPA v1') # Fill color map. color_map = [] index = 2 for x in range(width * height): # Read character cell color information. color = file_data[index] index += 1 # Save RGB background and foreground color. color_map.append((self._vga_palette[color & 0x0f], self._vga_palette[color >> 4])) # Create output image. image = PIL.Image.new('RGB', (width * 8, height * 14)) # Read image data. for y in range(height): for x in range(width): # Determine foreground/background colors for this character cell. fg_color, bg_color = color_map.pop(0) # Read the 14 row bitmaps. for cy in range(14): # Stop row bitmap processing if the file is truncated. if index >= len(file_data): width = height = 0 break # Read bitmap byte. bitmap = file_data[index] index += 1 # Parse the foreground/background bitmap. for cx in range(8): # Determine palette color and write pixel. color = (bitmap & (1 << cx)) and fg_color or bg_color image.putpixel(((x * 8) + (7 - cx), (y * 14) + cy), ((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff)) # Stop column processing if the file is truncated. if width == 0 or len(color_map) == 0: break # Stop row processing if the file is truncated. if height == 0 or len(color_map) == 0: break # Save output image. return self._save_image(image, dest_dir_0) def _convert_epav2_4b(self, file_data, image_data_offset, width, height, dest_dir_0): # Read palette if the file contains one, while # writing the file type as a header accordingly. palette = self._read_palette_epav2(file_data, -52, False) if palette: self._write_type(dest_dir_0, 'EPA v2 4-bit (with palette)') else: self._write_type(dest_dir_0, 'EPA v2 4-bit (without palette)') # Use standard EGA palette. palette = self._vga_palette # Create output image. image = PIL.Image.new('RGB', (width, height)) # Read image data. index = 8 bitmap_width = math.ceil(width / 8) for y in range(height): for x in range(bitmap_width): # Stop column processing if the file is truncated. if index + x + (bitmap_width * 3) >= len(file_data): index = 0 break for cx in range(8): # Skip this pixel if it's outside the image width. output_x = (x * 8) + cx if output_x >= width: continue # Read color values. Each bit is stored in a separate bitmap. pixel = (file_data[index + x] >> (7 - cx)) & 1 pixel |= ((file_data[index + x + bitmap_width] >> (7 - cx)) & 1) << 1 pixel |= ((file_data[index + x + (bitmap_width * 2)] >> (7 - cx)) & 1) << 2 pixel |= ((file_data[index + x + (bitmap_width * 3)] >> (7 - cx)) & 1) << 3 # Determine palette color and write pixel. if pixel > len(palette): pixel = len(palette) - 1 color = palette[pixel] image.putpixel((output_x, y), ((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff)) # Stop row processing if the file is truncated. if index == 0: break # Move on to the next set of 4 bitmaps. index += bitmap_width * 4 # Save output image. return self._save_image(image, dest_dir_0) def _convert_epav2_8b(self, file_data, image_data_offset, width, height, dest_dir_0): # Read palette if the file contains one, while # writing the file type as a header accordingly. palette = self._read_palette_epav2(file_data, -772) if palette: self._write_type(dest_dir_0, 'EPA v2 8-bit (with palette)') else: self._write_type(dest_dir_0, 'EPA v2 8-bit (without palette)') # Use standard VGA palette. palette = self._vga_palette # Create output image. image = PIL.Image.new('RGB', (width, height)) # Read image data. index = 8 for y in range(height): for x in range(width): # Read pixel. pixel = file_data[index] index += 1 # Determine palette color and write pixel. if pixel > len(palette): pixel = len(palette) - 1 color = palette[pixel] image.putpixel((x, y), ((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff)) # Save output image. return self._save_image(image, dest_dir_0) def _convert_pgx(self, file_data, image_data_offset, width, height, dest_dir_0): # Read palette if the file contains one, while # writing the file type as a header accordingly. if width < 0: # Normalize width. width = -width # Read palette. palette_size, = struct.unpack('I', palette_color) # shortcut to parse _RGB value palette_index += 1 index += 4 self._write_type(dest_dir_0, 'PGX (with {0}-color palette)'.format(palette_size)) else: # Use standard EGA palette. palette = self._vga_palette self._write_type(dest_dir_0, 'PGX (without palette)') # Create output image. image = PIL.Image.new('RGB', (width, height)) # Read image data. This looks a lot like EPA v2 4-bit but it's slightly different. index = 18 bitmap_width = math.ceil(width / 8) bitmap_size = height * bitmap_width for y in range(height): for x in range(bitmap_width): # Stop column processing if the file is truncated. if index + x + (bitmap_size * 3) >= len(file_data): index = 0 break for cx in range(8): # Skip this pixel if it's outside the image width. output_x = (x * 8) + cx if output_x >= width: continue # Read color values. Each bit is stored in a separate bitmap. pixel = (file_data[index + x] >> (7 - cx)) & 1 pixel |= ((file_data[index + x + bitmap_size] >> (7 - cx)) & 1) << 1 pixel |= ((file_data[index + x + (bitmap_size * 2)] >> (7 - cx)) & 1) << 2 pixel |= ((file_data[index + x + (bitmap_size * 3)] >> (7 - cx)) & 1) << 3 # Determine palette color and write pixel. if pixel > len(palette): pixel = len(palette) - 1 color = palette[pixel] image.putpixel((output_x, y), ((color >> 16) & 0xff, (color >> 8) & 0xff, color & 0xff)) # Stop row processing if the file is truncated. if index == 0: break # Move on to the next line in the 4 bitmaps. index += bitmap_width # Save output image. return self._save_image(image, dest_dir_0) def _convert_pil(self, file_data, image_data_offset, width, height, dest_dir_0): # Load image. try: file_data_io = io.BytesIO(file_data[image_data_offset:]) image = PIL.Image.open(file_data_io) # Don't save image if it's too small. x, y = image.size if (x * y) < 10000: raise Exception('too small') except: self.debug_print('PIL open failed') return False # Write the file type as a header. self._write_type(dest_dir_0, image.format) # Save output image. if image.format in ('GIF', 'PNG', 'JPEG'): if image.format == 'JPEG': ext = 'jpg' else: ext = image.format.lower() try: f = open(os.path.join(dest_dir_0, 'image.' + ext), 'wb') f.write(file_data[image_data_offset:]) f.close() return True except: self.debug_print('As-is copy failed') return False elif self._save_image(image, dest_dir_0): return file_data_io.tell() else: return False def _read_palette_epav2(self, file_data, rgbs_offset, rgb=True): # Stop if this file has no palette. if file_data[rgbs_offset:rgbs_offset + 4] != b'RGB ': return None # Read 6-bit palette entries, while converting to 8-bit. palette = [] index = rgbs_offset + 4 while index < 0: palette.append((file_data[index] << (rgb and 18 or 2)) | (file_data[index + 1] << 10) | (file_data[index + 2] << (rgb and 2 or 18))) index += 3 return palette def _save_image(self, image, dest_dir_0): # Save image to destination directory. image_path = os.path.join(dest_dir_0, 'image.png') try: image.save(image_path) return True except: self.debug_print('PIL save failed') # Clean up. try: os.remove(image_path) except: pass try: os.remove(os.path.join(dest_dir_0, ':header:')) except: pass return False def _write_type(self, dest_dir_0, identifier): self.debug_print('Type:', identifier) try: f = open(os.path.join(dest_dir_0, ':header:'), 'w') f.write(identifier) f.close() except: pass class FATExtractor(ArchiveExtractor): """Extract FAT disk images.""" def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Determine if this is a FAT filesystem. # Stop if this file is too small. if len(file_header) < 512: return False # Stop if this doesn't appear to be a FAT filesystem. if not self._is_fat(file_header): # Check for 4-byte AST filesystem followed by FAT filesystem. ast_size, unknown = struct.unpack('> 1) - 1) != int(length, 16): continue # Decode data. data = codecs.decode(data[:-2], 'hex') # Write data block at the specified address. f.seek(int(addr, 16)) f.write(data) # Finish destination file. f.close() except: return True # Create dummy header file. try: open(os.path.join(dest_dir, ':header:'), 'wb').close() except: pass # Remove file. try: os.remove(file_path) except: pass # Return destination directory. return dest_dir class ISOExtractor(ArchiveExtractor): """Extract ISO 9660 images.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Signature for identifying El Torito header data. self._eltorito_pattern = re.compile(b'''\\x01\\x00\\x00\\x00[\\x00-\\xFF]{26}\\x55\\xAA\\x88\\x04[\\x00-\\xFF]{3}\\x00[\\x00-\\xFF]{2}([\\x00-\\xFF]{4})''') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if this is not an ISO. if file_header[32769:32774] != b'CD001' or file_header[32777:32782] != b'CDROM': return False # Extract this as an archive. ret = self._extract_archive(file_path, dest_dir, remove=False) # Some El Torito hard disk images have an MBR (Lenovo ThinkPad UEFI updaters). # 7-Zip doesn't care about MBRs and just takes the El Torito sector count field # for granted, even though it may be inaccurate. Try to detect such inaccuracies. if type(ret) == str: # Check what 7-Zip tried to extract, if anything. elt_path = os.path.join(ret, '[BOOT]', 'Boot-HardDisk.img') try: elt_size = os.path.getsize(elt_path) except: elt_size = 0 # Does the size match known bad extractions? if elt_size == 512: # Read file. f = open(elt_path, 'rb') data = f.read(512) f.close() # Check for MBR boot signature. if data[-2:] == b'\x55\xAA': # Read up to 16 MB of the ISO as a safety net. file_header += util.read_complement(file_path, file_header) # Look for El Torito data. match = self._eltorito_pattern.search(file_header) if match: # Start a new El Torito extraction file. out_f = open(elt_path, 'wb') # Copy the entire ISO data starting from the boot offset. # Parsing the MBR would have pitfalls of its own... in_f = open(file_path, 'rb') in_f.seek(struct.unpack(' largest_part_size: largest_part_size = found_part_size # Stop if no main body parts were found somehow. if len(found_parts_main) == 0: return True # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Determine header-related sizes and offsets. start_offset = (file_header[90:95] != b'FLASH') and 512 or 0 version = util.read_string(file_header[start_offset + 112:]) # this assumes the chain name is less than 32 characters header_size = 112 + len(version) + 1 # and the logical area name is less than 24 characters, remaining = header_size & 31 # but there's no fast detection of Intel files otherwise if remaining: # padded to 32 bytes header_size += 32 - remaining part_data_offset = start_offset + header_size # Subtract header from largest part size. largest_part_size -= part_data_offset # Determine if this is an inverted BIOS. This is quite tricky, since # the header data and presence of boot blocks in the main body isn't # super accurate, so we make a best guess through the following rules: if have_rcv and not have_bbo: # Recovery boot block file present but no main boot block file present => inverted invert = True elif have_bbo and not have_rcv: # Main boot block file present but no recovery boot block file present => non-inverted invert = False elif len(version) <= 16: # Short version (first AMI and Phoenix runs) => inverted invert = True else: # Long version (second AMI and Phoenix runs) => non-inverted invert = have_rcv # backup check for AN430TX which is inverted but has rcv # Join the part lists together. found_parts = found_parts_main + found_parts_boot # Create destination file. dest_file_path = os.path.join(dest_dir, 'intel.bin') out_f = open(dest_file_path, 'wb') self.debug_print('Found', len(found_parts), 'parts, header size', header_size, 'bytes, largest part size', largest_part_size, 'bytes') # Copy parts to the destination file. bootblock_offset = None end_offset = 0 while len(found_parts) > 0: found_part_path, found_part_size = found_parts.pop(0) try: f = open(found_part_path, 'rb') # Read and parse header if present. if found_part_path[-4:].lower() == '.rcv': header = b'' logical_area = dest_offset = 0 logical_area_size = data_length = found_part_size else: f.seek(start_offset) header = f.read(part_data_offset) logical_area, logical_area_size = struct.unpack(' end_offset: end_offset = logical_area_size # Apply inversion if needed. if invert: dest_offset ^= 0x10000 # Determine the part's location. if logical_area == 0: # Place boot block at the end of the ROM. Usually, the last part is cut # short and the boot block slots in at the end of the gap, but D845PT has # a full 64 KB last part containing a copy of the BBO boot block data. if bootblock_offset == None: bootblock_offset = end_offset - data_length if bootblock_offset < 0: bootblock_offset = 0 dest_offset += bootblock_offset out_f.seek(dest_offset) # Copy data. self.debug_print(data_length, 'bytes @', hex(dest_offset), '=>', found_part_path) remaining = max(data_length, largest_part_size) part_data = b' ' while part_data and remaining > 0: part_data = f.read(min(remaining, 1048576)) out_f.write(part_data) remaining -= len(part_data) # Write padding. if logical_area == 1 and last_part == 0xff: if data_length <= 8192 and len(found_parts_boot) == 0: # Workaround for JN440BX, which requires its final # part (sized 8 KB) to be at the end of the image. self.debug_print('> Final part non-padded') remaining = 0 elif data_length == largest_part_size and ((dest_offset >> 16) & 1) == int(invert): # Workaround for SE440BX-2 and SRMK2, which require a # gap at the final 64 KB where the boot block goes. self.debug_print('> Final part gap') remaining += largest_part_size elif logical_area == 0 and dest_offset == bootblock_offset: # Don't pad a boot block insertion. remaining = 0 self.debug_print('> Adding', remaining, 'padding bytes') while remaining > 0: out_f.write(b'\xFF' * min(remaining, 1048576)) remaining -= 1048576 f.close() # Update ROM end offset. part_end_offset = out_f.tell() if part_end_offset > end_offset: end_offset = part_end_offset # Remove part. os.remove(found_part_path) except: import traceback traceback.print_exc() pass out_f.close() # Create new file with padding if the total size isn't a power of two. if end_offset > 0: padding_size = (1 << math.ceil(math.log2(end_offset))) - end_offset if padding_size > 0: try: # Create a new file. out_f = open(dest_file_path + '.padded', 'wb') # Write padding. self.debug_print('Adding', padding_size, 'bytes of initial padding') while padding_size > 0: out_f.write(b'\xFF' * min(padding_size, 1048576)) padding_size -= 1048576 # Write the original file contents. f = open(dest_file_path, 'rb') part_data = b' ' while part_data: part_data = f.read(1048576) out_f.write(part_data) f.close() out_f.close() # Remove the old file. try: os.remove(dest_file_path) except: pass # Move the new file into place. shutil.move(dest_file_path + '.padded', dest_file_path) except: pass # Copy the header to a file, so we can still get the BIOS version # from it in case the payload cannot be decompressed successfully. try: out_f = open(os.path.join(dest_dir, ':header:'), 'wb') out_f.write(file_header[start_offset:part_data_offset]) out_f.close() except: pass # Return destination directory. return dest_dir class IntelNewExtractor(Extractor): """Extract newer Intel single-part BIOS updates.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # BIOS payload header. Checking the first bytes makes sure # we don't accidentally parse $IBIOSI$ version strings. self._ibi_pattern = re.compile(b'''\\$IBI[\\x00-\\x4E\\x50-\\xFF][\\x00-\\x52\\x54-\\xFF][\\x00-\\x23\\x25-\\xFF][\\x00-\\xFF]([\\x00-\\xFF]{8})''') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if the payload header couldn't be found. match = self._ibi_pattern.search(file_header) if not match: return False # Parse payload header and stop if the sizes appear to be wrong. header_sizes = match.group(1) if header_sizes == b'\xAA\x55\xAA\x55\x55\xAA\x55\xAA': # This is a :header: file with an invalidated size field. return True header_size, payload_size = struct.unpack(' 65536 or payload_size > 16777216: return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Separate payload and header. try: # Open update file. in_f = open(file_path, 'rb') # Copy payload. payload_offset = match.start(0) + header_size try: in_f.seek(payload_offset) out_f = open(os.path.join(dest_dir, 'intel.bin'), 'wb') while payload_size > 0: data = in_f.read(min(payload_size, 1048576)) out_f.write(data) payload_size -= len(data) out_f.close() except: in_f.close() return True # Copy header. try: out_f = open(os.path.join(dest_dir, ':header:'), 'wb') # Copy data before the header size fields. in_f.seek(0) size = match.start(1) while size > 0: data = in_f.read(min(size, 1048576)) out_f.write(data) size -= len(data) # Invalidate the size fields so we don't process the header again. out_f.write(b'\xAA\x55\xAA\x55\x55\xAA\x55\xAA') # Write the rest of the header. in_f.seek(match.end(1)) size = payload_offset - match.end(1) while size > 0: data = in_f.read(min(size, 1048576)) out_f.write(data) size -= len(data) # Copy data after the payload. in_f.seek(payload_offset + payload_size) data = b' ' while data: data = in_f.read(1048576) out_f.write(data) out_f.close() except: pass # Remove update file. in_f.close() os.remove(file_path) except: pass # Return destination directory path. return dest_dir class InterleaveExtractor(Extractor): """Detect and de-interleave any interleaved ROMs.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # List of strings an interleaved BIOS might contain once deinterleaved. self._deinterleaved_strings = [ b'ALL RIGHTS RESERVED', b'All Rights Reserved', b'Illegal Interrupt No.', b'Phoenix Technologies Ltd.', # Phoenix b' COPR. IBM 198', # IBM and Tandon b'memory (parity error)', b'Copyright COMPAQ Computer Corporation', # Compaq b'Press any key when ready', # Access Methods b'* AMPRO Little Board', # AMPRO ] # Interleave the strings. self._interleaved_odd = [string[1::2] for string in self._deinterleaved_strings] self._interleaved_even = [string[::2] for string in self._deinterleaved_strings] self._interleaved_q3 = [string[3::4] for string in self._deinterleaved_strings] self._interleaved_q2 = [string[2::4] for string in self._deinterleaved_strings] self._interleaved_q1 = [string[1::4] for string in self._deinterleaved_strings] self._interleaved_q0 = [string[::4] for string in self._deinterleaved_strings] def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if this was already deinterleaved. dir_path, file_name = os.path.split(file_path) if os.path.exists(os.path.join(dir_path, ':combined:')): return False # Read up to 128 KB. file_header += util.read_complement(file_path, file_header, max_size=131072) # Check for interleaved strings. counterpart_string_sets = None sets_2 = [self._interleaved_odd, self._interleaved_even] sets_4 = [self._interleaved_q0, self._interleaved_q1, self._interleaved_q2, self._interleaved_q3] for part_set in (sets_2, sets_4): # Go through sets. for counterpart_set in part_set: # Go through strings. for string in counterpart_set: # Check if the string is present. if string in file_header: # Generate new string set list without this set. counterpart_string_sets = [new_set for new_set in part_set if new_set != counterpart_set] break # Stop if a set was found. if counterpart_string_sets: break if counterpart_string_sets: break # Stop if no interleaved strings could be found. if not counterpart_string_sets: return False # Acquire the multi-file lock. file_size = self.multifile_lock_acquire(file_path) # Create temporary interleaved data array. part_size = min(file_size, 16777216) data = [] # Look for each counterpart. dir_files = os.listdir(dir_path) dir_files.sort() counterpart_paths = [file_path] for counterpart_string_set in counterpart_string_sets: # Try to find this file's counterpart in the directory. counterpart_candidates = [] file_size = os.path.getsize(file_path) for file_in_dir in dir_files: # Skip seen files. file_in_dir_path = os.path.join(dir_path, file_in_dir) if file_in_dir_path in counterpart_paths: continue # Skip any files which differ in size. file_in_dir_size = 0 try: file_in_dir_size = os.path.getsize(file_in_dir_path) except: continue if file_in_dir_size != file_size: continue # Read up to 128 KB. file_in_dir_data = util.read_complement(file_in_dir_path, max_size=131072) if not file_in_dir_data: continue # Determine if this is a counterpart. counterpart = False for string in counterpart_string_set: if string in file_in_dir_data: counterpart = True break del file_in_dir_data # Move on if this is not a counterpart. if not counterpart: continue # Add to the list of candidates. counterpart_candidates.append(file_in_dir) # Find the closest counterpart candidate to this # file, and stop if no counterpart was found. counterpart_candidate = util.closest_prefix(file_name, counterpart_candidates, lambda x: util.remove_extension(x).lower()) if not counterpart_candidate: return False counterpart_path = os.path.join(dir_path, counterpart_candidate) counterpart_paths.append(counterpart_path) # Read into the data array. f = open(counterpart_path, 'rb') data.append(f.read(part_size)) f.close() # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Read this file into the data array. f = open(file_path, 'rb') data.insert(0, f.read(part_size)) f.close() # Write all deinterleaved permutations, as some sets may # contain the same interleaved string on more than one part. file_counter = 0 part_count = len(data) buf = bytearray(part_size * part_count) for permutation in itertools.permutations(range(part_count)): # Deinterleave from the array into the buffer. data_offset = 0 for data_index in permutation: buf[data_offset::part_count] = data[data_index] data_offset += 1 # Write deinterleaved file. f = open(os.path.join(dest_dir, 'deinterleaved_' + ''.join(util.base62[data_index] for data_index in permutation) + '.bin'), 'wb') f.write(buf) f.close() file_counter += 1 # Save some memory. Might be placebo, but it doesn't hurt. del buf del data # Move interleaved files to preserve them, # as some sets may deinterleave incorrectly. file_counter = 0 for counterpart_path in counterpart_paths: # Move original file. try: shutil.move(counterpart_path, os.path.join(dest_dir, 'interleaved_' + util.base62[file_counter] + '.bin')) except: pass file_counter += 1 # Remove the original file in case moving failed. try: os.remove(counterpart_path) except: pass # Create flag file on the destination directory for the analyzer # to treat it as a big chunk of data, combining all permutations. f = open(os.path.join(dest_dir, ':combined:'), 'wb') f.write(b'\x00' * part_count) f.close() # Return destination directory path. return dest_dir class MBRSafeExtractor(ArchiveExtractor): """Extract MBR disk images which appear to have a valid MBR.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Signature for identifying typical MBRs. self._mbr_pattern = re.compile(b'''(?:Error loading|Missing) operating system''') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Extract this as an archive if MBR signatures are present. if file_header[510:512] == b'\x55\xAA' and self._is_mbr(file_header): return self._extract_archive(file_path, dest_dir) # No MBR found. return False def _is_mbr(self, file_header): # Helper function to determine if this *really* looks like some kind of MBR. return self._mbr_pattern.search(file_header[:510]) class MBRUnsafeExtractor(MBRSafeExtractor): """Extract MBR disk images which have the MBR signature.""" def _is_mbr(self, file_header): # Anything goes over here. return True class OMFExtractor(Extractor): """Extract Fujitsu/ICL OMF BIOS files.""" def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if this file is too small (may be a copied header). if len(file_header) <= 112: return False # Stop if this is not an OMF file. if file_header[0] != 0xb2: return False # Stop if the OMF payload is incomplete or the sizes are invalid. # Should catch other files which start with 0xB2. file_size = os.path.getsize(file_path) if struct.unpack(' file_size: return False elif struct.unpack(' file_size - 112: return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Separate payload and header. try: # Open OMF file. in_f = open(file_path, 'rb') # Read header. header = in_f.read(112) # Copy payload. try: out_f = open(os.path.join(dest_dir, 'omf.bin'), 'wb') data = b' ' while data: data = in_f.read(1048576) out_f.write(data) # Truncate payloads with an extra byte. pos = out_f.tell() if pos & 1: out_f.truncate(pos - 1) out_f.close() except: in_f.close() return True # Write header. try: out_f = open(os.path.join(dest_dir, ':header:'), 'wb') out_f.write(header) out_f.close() except: pass # Remove OMF file. in_f.close() os.remove(file_path) except: pass # Return destination directory path. return dest_dir class PEExtractor(ArchiveExtractor): """Extract PE executables.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Signatures for flash tool executables which may have an embedded ROM. self._flashtool_pattern = re.compile( b'''(Software\\\\AMI\\\\AFUWIN)|''' # AMIBIOS 8 AFUWIN (many ASRock) b'''(AOpen FLASH ROM Utility R)|''' # AOpen (AP61) b'''Micro Firmware, Incorporated \\* ''' # Micro Firmware (Intel Monsoon surfaced so far) ) # Path to the deark utility. self._deark_path = os.path.abspath(os.path.join('deark', 'deark')) if not os.path.exists(self._deark_path): self._deark_path = None def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Determine if this is a PE/MZ. if file_header[:2] != b'MZ': return False # Cover PKZIP self-extractor (with PKLITE-compressed stub) # with an incorrect extension. This does appear to happen. if file_header[30:36] != b'PKLITE' or b'PK\x03\x04' not in file_header: # The MZ signature is way too short. Check extension as well to be safe. # This also stops :header: files from being re-processed as flash tools. if file_path[-4:].lower() not in ('.exe', '.dll', '.scr'): return False # Determine if this executable can be extracted with deark. ret = self.extract_deark(file_path, file_header, dest_dir) if ret: return ret # Read up to 16 MB as a safety net. file_header += util.read_complement(file_path, file_header) # Determine if this executable may have an embedded ROM. match = self._flashtool_pattern.search(file_header) if match: # Extract embedded ROM and stop if extraction was successful. ret = self._extract_flashtool(file_path, file_header, dest_dir, match) if ret: return ret # Extract this as an archive. return self._extract_archive(file_path, dest_dir) def extract_deark(self, file_path, file_header, dest_dir, remove=True, delegated=False): # Stop if deark is not available. if not self._deark_path: return False # Determine if deark can extract this file as an executable, and stop if it can't. file_path_abs = os.path.abspath(file_path) if not delegated: proc = subprocess.run([self._deark_path, '-opt', 'execomp', '-l', file_path_abs], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if proc.stdout[:12] != b'Module: exe\n' or proc.stdout[-16:] != b'\noutput.000.exe\n': return False self.debug_print('Using deark') # Stop if this is a dry run. if not dest_dir: return True # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Run deark command to extract the executable. orig_basename, orig_ext = os.path.splitext(os.path.basename(file_path_abs)) subprocess.run([self._deark_path, '-opt', 'execomp', '-o', orig_basename, file_path_abs], stdout=self._devnull, stderr=subprocess.STDOUT, cwd=dest_dir) # Assume failure if nothing was extracted. files_extracted = os.listdir(dest_dir) if len(files_extracted) < 1: self.debug_print('deark produced no files:', file_path) return False # Rename single file. if len(files_extracted) == 1: _, dest_ext = os.path.splitext(files_extracted[0]) if orig_ext.lower() == dest_ext.lower(): # keep casing if the extension hasn't changed dest_ext = orig_ext self.debug_print('Renaming', repr(files_extracted[0]), 'to', repr(orig_basename + dest_ext)) try: new_fn = orig_basename + dest_ext shutil.move(os.path.join(dest_dir, files_extracted[0]), os.path.join(dest_dir, new_fn)) if delegated: files_extracted = [new_fn] except: pass # Remove original file. if remove: try: os.remove(file_path) except: pass # Return destination directory path or first file path. if delegated: return os.path.join(dest_dir, files_extracted[0]) else: return dest_dir def _extract_flashtool(self, file_path, file_header, dest_dir, match): # Determine embedded ROM start and end offsets. if match.group(1): # AFUWIN # Look for markers and stop if one of them wasn't found. rom_start_offset = file_header.find(b'_EMBEDDED_ROM_START_\x00') if rom_start_offset == -1: return False rom_start_offset += 21 rom_end_offset = file_header.find(b'_EMBEDDED_ROM_END_\x00', rom_start_offset) if rom_end_offset == -1: return False else: # others # Round ROM size down to a power of two. try: file_size = os.path.getsize(file_path) except: file_size = len(file_header) rom_size = 1 << math.floor(math.log2(file_size)) rom_start_offset = file_size - rom_size # ROM located at the end of the file rom_end_offset = file_size # Adjust offsets if needed. if match.group(2): # AOpen # Stop if this file appears to be standalone AOFLASH. if file_size < 32768: return False # Skip checksum word at the end. All files I've seen # contain it, but check for its presence just in case. remaining = file_size & 15 if remaining == 2: rom_start_offset -= remaining rom_end_offset -= remaining # Skip BBOO data block. Same caveat as above. rom_half_offset = int((rom_start_offset + rom_end_offset) / 2) if file_header[rom_half_offset:rom_half_offset + 6] == b'*BBOO*': rom_end_offset = rom_half_offset # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Extract ROM. try: f = open(os.path.join(dest_dir, 'flashtool.bin'), 'wb') f.write(file_header[rom_start_offset:rom_end_offset]) f.close() except: return True # Write data before and after the embedded ROM as a header. try: f = open(os.path.join(dest_dir, ':header:'), 'wb') f.write(file_header[:rom_start_offset]) f.write(file_header[rom_end_offset:]) f.close() except: pass # Remove file. try: os.remove(file_path) except: pass # Return destination directory path. return dest_dir class TarExtractor(ArchiveExtractor): """Extract tar archives.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 00 00 00 = POSIX tar # 20 20 00 = GNU tar # 00 30 30 = some other form of tar? self._signature_pattern = re.compile(b'''ustar(?:\\x00(?:\\x00\\x00|\\x30\\x30)|\\x20\\x20\\x00)''') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Determine if this is a tar archive. for offset in (0, 257): if self._signature_pattern.match(file_header[offset:offset + 8]): # Extract this as an archive. return self._extract_archive(file_path, dest_dir) # Not a tar archive. return False class TrimondExtractor(Extractor): """Extract Trimond/Mitsubishi BIOS updates.""" def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Act only on files at least 128 KB with a chunk of 8-32 KB missing, as a # safety margin since only 256-minus-16 KB images have been observed so far. try: file_size = os.path.getsize(file_path) except: return False if file_size < 131072: return False pow2 = 1 << math.ceil(math.log2(file_size)) if pow2 - file_size not in (8192, 16384, 32768): return False # Acquire the multi-file lock. self.multifile_lock_acquire(file_path) # As a second safety layer, check for Trimond's flasher files. dir_path, file_name = os.path.split(file_path) dir_files = os.listdir(dir_path) dir_files_lower = [filename.lower() for filename in dir_files] if 'aflash.exe' not in dir_files_lower or 'cnv.exe' not in dir_files_lower or 'b.bat' not in dir_files_lower: return False # Look for other counterpart candidates. counterpart_candidates = [] for counterpart_name in dir_files: if counterpart_name == file_name: continue try: counterpart_size = os.path.getsize(os.path.join(dir_path, counterpart_name)) except: continue # Must add up to the next power of two. if (file_size + counterpart_size) == pow2: counterpart_candidates.append(counterpart_name) # Find the closest counterpart candidate to this # file, and stop if no counterpart was found. counterpart_candidate = util.closest_prefix(file_name, counterpart_candidates, lambda x: util.remove_extension(x).lower()) if not counterpart_candidate: return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Join both files together. counterpart_path = os.path.join(dir_path, counterpart_candidate) out_f = open(os.path.join(dest_dir, counterpart_candidate), 'wb') in_f = open(file_path, 'rb') data = b' ' while data: data = in_f.read(1048576) out_f.write(data) in_f.close() in_f = open(counterpart_path, 'rb') data = b' ' while data: data = in_f.read(1048576) out_f.write(data) in_f.close() out_f.close() # Create dummy header file on the destination directory. try: open(os.path.join(dest_dir, ':header:'), 'wb').close() except: pass # Remove files. try: os.remove(file_path) except: pass try: os.remove(counterpart_path) except: pass return dest_dir class UEFIExtractor(Extractor): """Extract UEFI BIOS images.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Known UEFI signatures. self._signature_pattern = re.compile(b'''EFI_|D(?:xe|XE)|P(?:ei|EI)''') # Ignore padding and microcode files. self._invalid_file_pattern = re.compile('''(?:Padding|Microcode)_''') # Path to the UEFIExtract utility. self._uefiextract_path = os.path.abspath('UEFIExtract') if not os.path.exists(self._uefiextract_path): self._uefiextract_path = None # /dev/null handle for suppressing output. self._devnull = open(os.devnull, 'wb') def extract(self, file_path, file_header, dest_dir, dest_dir_0): # Stop if UEFIExtract is not available. if not self._uefiextract_path: return False # Read up to 16 MB as a safety net. file_header += util.read_complement(file_path, file_header) # Stop if no UEFI signatures are found. if not self._signature_pattern.search(file_header): return False # Start UEFIExtract process. file_path_abs = os.path.abspath(file_path) try: subprocess.run([self._uefiextract_path, file_path_abs, 'unpack'], timeout=30, stdout=self._devnull, stderr=subprocess.STDOUT) except: pass # Remove report file. try: os.remove(file_path_abs + '.report.txt') except: pass # Stop if the dump directory was somehow not created. dump_dir = file_path_abs + '.dump' if not os.path.isdir(dump_dir): try: os.remove(report_file) except: pass return False # Move dump directory over to the destination. try: # Move within the same filesystem. os.rename(dump_dir, dest_dir_0) if not os.path.isdir(dest_dir_0): raise Exception() except: try: # Move across filesystems. shutil.move(dump_dir, dest_dir_0) if not os.path.isdir(dest_dir_0): raise Exception() except: # Remove left-overs and stop if the move failed. for to_remove in (dump_dir, dest_dir_0): try: shutil.rmtree(to_remove) except: pass return True # Go through the dump, counting valid .bin files and removing .txt ones. valid_file_count = 0 for scan_file_name in os.listdir(dest_dir_0): if scan_file_name[-4:] == '.bin': # Non-UEFI images will only produce padding and microcode files. if not self._invalid_file_pattern.match(scan_file_name): valid_file_count += 1 else: try: os.remove(os.path.join(dest_dir_0, scan_file_name)) except: pass # Assume failure if nothing valid was extracted. # Actual UEFI images produce thousands of files, so 5 is a safe barrier. if valid_file_count < 1: return False elif valid_file_count < 5: # Remove left-overs and stop. try: shutil.rmtree(dest_dir_0) except: pass return False # Convert any BIOS logo images in-line (to the same destination directory). self.image_extractor.convert_inline(os.listdir(dest_dir_0), dest_dir_0) # Create header file with a dummy string, to tell the analyzer # this BIOS went through this extractor. try: f = open(os.path.join(dest_dir_0, ':header:'), 'wb') f.write(b'\x00\xFFUEFIExtract\xFF\x00') f.close() except: pass # Create flag file on the destination directory for the analyzer to # treat it as a big chunk of data. open(os.path.join(dest_dir_0, ':combined:'), 'wb').close() # Remove BIOS file. try: os.remove(file_path) except: pass # Return destination directory path. return dest_dir_0 class VMExtractor(PEExtractor): """Extract files which must be executed in a virtual machine.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Known signatures. self._floppy_pattern = re.compile( b'''( FastPacket V[0-9])|''' # Siemens Nixdorf FastPacket b''', Sydex, Inc\\. All Rights Reserved\\.|''' # IBM Sydex b'''Disk eXPress Self-Extracting Diskette Image|''' # HP DXP b'''(\\x00Diskette Image Decompression Utility\\.\\x00)|''' # NEC in-house b'''(Copyright Daniel Valot |\\x00ARDI - \\x00)|''' # IBM ARDI b'''(Ready to build distribution image with the following attributes:)|''' # Zenith in-house b'''(Error reading the Softpaq File information)|''' # Compaq Softpaq b'''(DELLXBIOS[\\x00-\\xFF]+;C_FILE_INFO[\\x00-\\xFF]+<>)''' # Dell in-house ) self._eti_pattern = re.compile(b'''[0-9\\.\\x00]{10}[0-9]{2}/[0-9]{2}/[0-9]{2}\\x00{2}[0-9]{2}:[0-9]{2}:[0-9]{2}\\x00{3}''') self._rompaq_pattern = re.compile(b'''[\\x00-\\xFF]{12}[A-Z0-9]{7}\\x00[0-9]{2}/[0-9]{2}/[0-9]{2}\\x00''') # Filename sanitization pattern. self._dos_fn_pattern = re.compile('''[\\x00-\\x1F\\x7F-\\xFF\\\\/:\\*\\?"<>\\|]''') # /dev/null handle for suppressing output. self._devnull = open(os.devnull, 'wb') # Path to QEMU. self._qemu_path = None for path in ('qemu-system-i386', 'qemu-system-x86_64'): try: subprocess.run([path, '-version'], stdout=self._devnull, stderr=subprocess.STDOUT).check_returncode() self._qemu_path = path break except: pass # Check for other dependencies. self._dep_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(util.__file__))), 'vm') for dep in ('floppy.144', 'freedos.img'): if not os.path.exists(os.path.join(self._dep_dir, dep)): self._qemu_path = None break def extract(self, file_path, file_header, dest_dir, dest_dir_0, *, allow_deark=True): # Stop if QEMU or other dependencies are not available. if not self._qemu_path: return False # Check for cases which require this extractor. # All signatures should be within the first 32 KB or so. extractor = None extractor_kwargs = {} if file_header[:2] == b'MZ' and b'PK\x03\x04' not in file_header: # skip ZIP self-extractors with compressed stubs # Read up to 16 MB as a safety net. file_header += util.read_complement(file_path, file_header) match = self._floppy_pattern.search(file_header) if match: extractor = self._extract_floppy extractor_kwargs['match'] = match elif allow_deark and self.extract_deark(file_path, file_header, None): # avoid infinite loops # Acquire the multi-file lock if this is a ROMPAQ.EXE. # This is required for ROMPAQ extraction below. if os.path.basename(file_path).lower() == 'rompaq.exe': self.multifile_lock_acquire(file_path) extractor = self._extract_deark elif self._eti_pattern.match(file_header): extractor = self._extract_eti elif self._rompaq_pattern.match(file_header): # Acquire the multi-file lock. self.multifile_lock_acquire(file_path) # The ROMPAQ format appears to be version specific in some way. # We will only extract files that have a ROMPAQ.EXE next to them. dir_path = os.path.dirname(file_path) rompaq_path = None for file_in_dir in os.listdir(dir_path): if file_in_dir.lower() == 'rompaq.exe': rompaq_path = os.path.join(dir_path, file_in_dir) break # Now look for a PKLITE-decompressed ROMPAQ.EXE. dest_parent_dir = os.path.dirname(dest_dir) if not rompaq_path and os.path.isdir(dest_parent_dir): for file_in_dir in os.listdir(dest_parent_dir): if file_in_dir.lower() == 'rompaq.exe:': rompaq_path = os.path.join(dest_parent_dir, file_in_dir, file_in_dir[:-1]) if os.path.exists(rompaq_path): break else: rompaq_path = None # Enter ROMPAQ mode if the EXE was found. if rompaq_path: extractor = self._extract_rompaq extractor_kwargs['rompaq_path'] = rompaq_path # Stop if no case was found. if not extractor: return False # Create destination directory and stop if it couldn't be created. if not util.try_makedirs(dest_dir): return True # Run extractor. self.debug_print('Running', extractor.__name__) return extractor(file_path, file_header, dest_dir, dest_dir_0, **extractor_kwargs) def _run_qemu(self, hdd=None, hdd_snapshot=True, floppy=None, floppy_snapshot=True, vvfat=None, boot='c', monitor_cmd=None, monitor_flag_file=None): # Build QEMU arguments. args = [self._qemu_path, '-m', '32', '-boot', boot] if not self.debug or not os.getenv('DISPLAY'): args += ['-display', 'none', '-vga', 'none'] if monitor_cmd: args += ['-monitor', 'stdio'] for drive, drive_snapshot, drive_if in ((floppy, floppy_snapshot, 'floppy'), (hdd, hdd_snapshot, 'ide')): # Don't add this drive if an image was not specified. if not drive: # Add dummy floppy to prevent errors if no floppy is specified. if drive_if == 'floppy': drive = os.path.join(self._dep_dir, 'floppy.144') drive_snapshot = True else: continue # Add drive. args += ['-drive', 'if=' + drive_if + ',snapshot=' + (drive_snapshot and 'on' or 'off') + ',format=raw,file=' + drive.replace(',', ',,')] if vvfat: # Add vvfat if requested. args += ['-drive', 'if=ide,driver=vvfat,rw=on,dir=' + vvfat.replace(',', ',,')] # regular vvfat syntax can't handle : in path # Run QEMU. self.debug_print('Running QEMU with args:', args) try: if monitor_cmd and monitor_flag_file: proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=self._devnull, stderr=subprocess.STDOUT) # Wait for flag file if one was specified. if monitor_flag_file: spins = 0 while not os.path.exists(monitor_flag_file) and spins < 60: time.sleep(1) spins += 1 if spins < 60: self.debug_print('Monitor flag file found') else: self.debug_print('Monitor flag file timed out') # Send monitor command if one was specified, and wait for the QEMU process. proc.communicate(input=monitor_cmd, timeout=60) else: subprocess.run(args, input=monitor_cmd, timeout=60, stdout=self._devnull, stderr=subprocess.STDOUT) except: self.debug_print('Running QEMU failed (timed out?)') def _extract_floppy(self, file_path, file_header, dest_dir, dest_dir_0, *, match): """Extract DOS-based floppy self-extractors.""" # Only support 1.44 MB floppies for now. floppy_media = 'floppy.144' # Copy original file and blank floppy image to the destination directory. if match.group(6): # Dell in-house names the extracted file after the executable exe_name = 'dell.exe' else: exe_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.exe' exe_path = os.path.join(dest_dir, exe_name) image_path = os.path.join(dest_dir, util.random_name(8) + '.img') shutil.copy2(file_path, exe_path) shutil.copy2(os.path.join(self._dep_dir, floppy_media), image_path) flag_name = flag_path = None # Create batch file for calling the executable. bat_path = os.path.join(dest_dir, 'autoexec.bat') f = open(bat_path, 'wb') if match.group(2): # NEC in-house # This SFX has trouble with (at least) the FreeDOS memory manager. # Work around that by moving config.sys out of the way to disable the # memory manager, rebooting the system, then executing the SFX proper. f.write(b'c:\r\n') f.write(b'if not exist config.sys goto sfx\r\n') f.write(b'move /y config.sys config.old\r\n') f.write(b'echo o cf9 6|debug\r\n') # TRC reset f.write(b'exit\r\n') # just in case f.write(b':sfx\r\n') f.write(b'move /y config.old config.sys\r\n') # just in case again (snapshot shouldn't persist changes) elif match.group(3): # ARDI f.write(b'echo.|') elif match.group(4) or match.group(6): # Zenith in-house, Dell in-house f.write(b'a:\r\n') elif match.group(5): # Compaq Softpaq # Create flag file for sending the monitor commands. flag_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.dat' flag_path = os.path.join(dest_dir, flag_name) f.write(b'echo. >d:\\' + flag_name.encode('cp437', 'ignore') + b'\r\n') f.write(b'd:' + exe_name.encode('cp437', 'ignore')) if match.group(1): # FastPacket f.write(b' /b a:\r\n') elif match.group(3) or match.group(5): # ARDI or Compaq Softpaq f.write(b'\r\n') elif match.group(4): # Zenith in-house f.write(b' 0: try: os.remove(image_path) except: pass image_path = temp_image_path # Remove temporary files. (exename.tmp = FastPacket) util.remove_all((bat_path, exe_path, exe_path[:-3] + 'tmp', os.path.join(dest_dir, exe_name[:-3].upper() + 'TMP'), flag_path)) # Extract image as an archive. ret = self._extract_archive(image_path, dest_dir, remove=False) if type(ret) == str and len(os.listdir(dest_dir)) > 1: # Remove original file. try: os.remove(file_path) except: pass # Flag success. ret = dest_dir else: ret = False # Remove image. try: os.remove(image_path) except: pass return ret def _extract_eti(self, file_path, file_header, dest_dir, dest_dir_0): """Extract Evergreen ETI files.""" # Read ETI header. in_f = open(file_path, 'rb') header = in_f.read(0x1f) # Parse creation date and time. try: date = header[10:18].decode('cp437', 'ignore') time = header[20:28].decode('cp437', 'ignore') dt = datetime.datetime.strptime(date + ' ' + time, '%m/%d/%y %H:%M:%S') ctime = (dt - datetime.datetime(1970, 1, 1)).total_seconds() except: ctime = 0 # Start the extraction batch file. bat_f = open(os.path.join(dest_dir, 'autoexec.bat'), 'wb') bat_f.write(b'd:\r\n') # Extract files into individual ETIs. temp_files = ['autoexec.bat', 'contact.eti', 'contact.txt', 'prevlang.dat'] while True: # Parse file header. fn = in_f.read(12) # filename if fn == None: break nul_index = fn.find(b'\x00') if nul_index > -1: fn = fn[:nul_index] if len(fn) == 0: break fn = fn.decode('cp437', 'ignore') self.debug_print('ETI file:', fn) in_f.read(5) # rest of header size = struct.unpack(' 0: data = in_f.read(min(size, 1048576)) out_f.write(data) # data size -= len(data) out_f.close() # Finish the batch file. bat_f.close() # Run QEMU. self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), vvfat=dest_dir) # Remove temporary files. util.remove_all(temp_files, lambda x: (os.path.join(dest_dir, x), os.path.join(dest_dir, x.upper()))) # Check if anything was extracted. dest_dir_files = os.listdir(dest_dir) if len(dest_dir_files) > 0: # Remove original file. try: os.remove(file_path) except: pass # Set timestamps if applicable. if ctime > 0: for fn in dest_dir_files: try: os.utime(os.path.join(dest_dir, fn), (ctime, ctime)) except: pass return dest_dir else: return True def _extract_deark(self, file_path, file_header, dest_dir, dest_dir_0): """Extract compressed executables with deark and run them through the same pipeline. This is required for the following self-extractors, which contain a compressed stub: - Compaq Softpaq (PKLITE) - Dell in-house (LZEXE with no LZ91 signature) - NEC in-house (PKLITE) - Siemens Nixdorf FastPacket (LZEXE) - Zenith in-house (PKLITE) The decompressed files cannot be executed (they're garbage), so the pipeline has to run with the original file path, while file_header gets the decompressed executable's data.""" # Run deark extractor and stop if it wasn't successful. unpacked_path = self.extract_deark(file_path, file_header, dest_dir, remove=False, delegated=True) if type(unpacked_path) != str: return unpacked_path # Read unpacked file. decomp_file_data = util.read_complement(unpacked_path) # Run this same extractor with detectors pointed at the unpacked data. ret = self.extract(file_path, decomp_file_data, dest_dir, dest_dir_0, allow_deark=False) # Remove original file. try: os.remove(file_path) except: pass # Stop if extraction was successful. if ret: # Remove unpacked file. try: os.remove(unpacked_path) except: pass return ret # Keep the unpacked file around for other extractors to process. return dest_dir def _extract_rompaq(self, file_path, file_header, dest_dir, dest_dir_0, *, rompaq_path): """Extract Compaq ROMPAQ-compressed BIOS images using a ROMPAQ.EXE provided next to the image.""" # Copy original file and ROMPAQ.EXE to the destination directory. # Also determine output file name. rom_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.bin' rom_path = os.path.join(dest_dir, rom_name) exe_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.exe' exe_path = os.path.join(dest_dir, exe_name) try: shutil.copy2(file_path, rom_path) shutil.copy2(rompaq_path, exe_path) except: return True # Set a name for the unpacked file. unpacked_name = 'rompaq.bin' unpacked_path = os.path.join(dest_dir, unpacked_name) # Create batch file for extraction. bat_path = os.path.join(dest_dir, 'autoexec.bat') f = open(bat_path, 'wb') f.write(b'd:\r\n' + exe_name.encode('cp437', 'ignore') + b' /D ' + rom_name.encode('cp437', 'ignore') + b' ' + unpacked_name.encode('cp437', 'ignore') + b'\r\n') f.close() # Run QEMU. self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), vvfat=dest_dir) # Remove temporary files. util.remove_all((bat_path, rom_path, exe_path)) # Stop if unpacking was not successful. if not os.path.exists(unpacked_path): unpacked_path = os.path.join(dest_dir, unpacked_name.upper()) # just in case if not os.path.exists(unpacked_path): return False # Remove original file. try: os.remove(file_path) except: pass # Create dummy header file. try: open(os.path.join(dest_dir, ':header:'), 'wb').close() except: pass # Return destination directory path. return dest_dir