OMFExtractor: Improve header parsing and add OCFExtractor

This commit is contained in:
RichardG867
2022-08-25 20:21:24 -03:00
parent e2d57611fb
commit 6814241c23
2 changed files with 76 additions and 6 deletions

View File

@@ -127,6 +127,7 @@ def extract_process(queue, abort_flag, multifile_lock, dir_number_path, next_dir
]
file_extractors += [
extractors.IntelExtractor(),
extractors.OCFExtractor(),
extractors.OMFExtractor(),
extractors.TrimondExtractor(),
extractors.InterleaveExtractor(),

View File

@@ -2103,25 +2103,93 @@ class MBRUnsafeExtractor(MBRSafeExtractor):
return True
class OCFExtractor(Extractor):
"""Extract Fujitsu/ICL OCF BIOS files."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# RLE header.
self._snipac_pattern = re.compile(b'''SNIPAC([0-9A-F]{2})([\\x00-\\xFF]{4})''')
def _expand_rle(self, match):
length, = struct.unpack('<I', match.group(2))
return bytes([int(match.group(1), 16)]) * length
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this is not an RLE compressed file.
if not self._snipac_pattern.search(file_header):
return False
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
# Decompress RLE data.
file_header = self._snipac_pattern.sub(self._expand_rle, file_header)
# Write decompressed data.
try:
out_f = open(os.path.join(dest_dir, 'ocf.bin'), 'wb')
out_f.write(file_header)
out_f.close()
except:
return True
# Create dummy header file.
try:
open(os.path.join(dest_dir, ':header:'), 'wb').close()
except:
pass
# Remove OCF file.
try:
os.remove(file_path)
except:
pass
# Return destination directory path.
return dest_dir
class OMFExtractor(Extractor):
"""Extract Fujitsu/ICL OMF BIOS files."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Quick header signature for checking validity.
self._header_pattern = re.compile(
b'''[\\x00-\\xFF]''' # arbitrary byte (not always B2!)
b'''([\\x00-\\xFF]{4})''' # file size
b'''([\\x1A\\x20-\\x7E]{32})''' # file timestamp
b'''[\\x00-\\xFF]{14}''' # more fields
b'''([\\x00\\x20-\\x7E]{8})''' # file signature (can be all 00 on auxiliary files at least)
)
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Stop if this file is too small (may be a copied header).
if len(file_header) <= 112:
return False
# Stop if this is not an OMF file.
if file_header[0] != 0xb2:
header_offset = 0
match = self._header_pattern.match(file_header)
if not match: # whatever has this offset header was lost to time
header_offset = 112
match = self._header_pattern.match(file_header[header_offset:header_offset + 112])
if not match:
return False
# Stop if the OMF payload is incomplete or the sizes are invalid.
# Should catch other files which start with 0xB2.
# Stop if the OMF size is invalid. Should catch other files that match the quick check.
file_size = os.path.getsize(file_path)
if struct.unpack('<I', file_header[1:5])[0] > file_size:
return False
elif struct.unpack('<I', file_header[108:112])[0] > file_size - 112:
omf_size, = struct.unpack('<I', match.group(1))
if omf_size > file_size:
return False
self.debug_print('Size', omf_size, 'timestamp', match.group(2), 'signature', match.group(3))
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
@@ -2133,6 +2201,7 @@ class OMFExtractor(Extractor):
in_f = open(file_path, 'rb')
# Read header.
in_f.seek(header_offset)
header = in_f.read(112)
# Copy payload.