Switch PKLITE and LZEXE decompression to deark, also adding EXEPACK

This commit is contained in:
RichardG867
2022-05-22 00:42:12 -03:00
parent 273c971af8
commit ed1a838559

View File

@@ -254,6 +254,7 @@ class ArchiveExtractor(Extractor):
# Assume failure if nothing was extracted.
files_extracted = os.listdir(dest_dir)
if len(files_extracted) < 1:
self.debug_print('Extraction produced no files:', file_path)
return False
# Rename single file. (gzip/bzip2/etc.)
@@ -2115,19 +2116,32 @@ class PEExtractor(ArchiveExtractor):
b'''Micro Firmware, Incorporated \\* ''' # Micro Firmware (Intel Monsoon surfaced so far)
)
# Signatures for compressed executables.
self._exepack_pattern = re.compile(b'''\\xCD\\x21\\xB8\\xFF\\x4C\\xCD\\x21''')
# Path to the deark utility.
self._deark_path = os.path.abspath(os.path.join('deark', 'deark'))
if not os.path.exists(self._deark_path):
self._deark_path = None
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
# Determine if this is a PE/MZ.
if file_header[:2] != b'MZ':
return False
# The MZ signature is way too short. Check extension as well to be safe.
# This also stops :header: files from being re-processed as flash tools.
if file_path[-4:].lower() not in ('.exe', '.dll', '.scr'):
# Cover PKZIP self-extractor (with PKLITE-compressed stub)
# with an incorrect extension. This does appear to happen.
if file_header[30:36] != b'PKLITE' or b'PK\x03\x04' not in file_header:
# Cover PKZIP self-extractor (with PKLITE-compressed stub)
# with an incorrect extension. This does appear to happen.
if file_header[30:36] != b'PKLITE' or b'PK\x03\x04' not in file_header:
# The MZ signature is way too short. Check extension as well to be safe.
# This also stops :header: files from being re-processed as flash tools.
if file_path[-4:].lower() not in ('.exe', '.dll', '.scr'):
return False
# Determine if this executable can be extracted with deark.
ret = self.extract_deark(file_path, file_header, dest_dir)
if ret:
return ret
# Read up to 16 MB as a safety net.
file_header += util.read_complement(file_path, file_header)
@@ -2142,6 +2156,67 @@ class PEExtractor(ArchiveExtractor):
# Extract this as an archive.
return self._extract_archive(file_path, dest_dir)
def extract_deark(self, file_path, file_header, dest_dir, remove=True, delegated=False):
# Stop if deark is not available.
if not self._deark_path:
return False
# Determine deark module to use, and stop if nothing matches.
if file_header[28:32] == b'LZ91':
module = 'lzexe'
elif file_header[30:36] == b'PKLITE':
module = 'pklite'
elif self._exepack_pattern.search(file_header):
module = 'exepack'
else:
return False
self.debug_print('Using deark module:', module)
# Stop and return the module name if this is a dry run.
if not dest_dir:
return module
# Create destination directory and stop if it couldn't be created.
if not util.try_makedirs(dest_dir):
return True
# Run deark command to extract the executable.
orig_basename, orig_ext = os.path.splitext(os.path.basename(file_path))
subprocess.run([self._deark_path, '-m', module, '-o', orig_basename, os.path.abspath(file_path)], stdout=self._devnull, stderr=subprocess.STDOUT, cwd=dest_dir)
# Assume failure if nothing was extracted.
files_extracted = os.listdir(dest_dir)
if len(files_extracted) < 1:
self.debug_print('deark produced no files:', file_path)
return False
# Rename single file.
if len(files_extracted) == 1:
_, dest_ext = os.path.splitext(files_extracted[0])
if orig_ext.lower() == dest_ext.lower(): # keep casing if the extension hasn't changed
dest_ext = orig_ext
self.debug_print('Renaming', repr(files_extracted[0]), 'to', repr(orig_basename + dest_ext))
try:
new_fn = orig_basename + dest_ext
shutil.move(os.path.join(dest_dir, files_extracted[0]), os.path.join(dest_dir, new_fn))
if delegated:
files_extracted = [new_fn]
except:
pass
# Remove original file.
if remove:
try:
os.remove(file_path)
except:
pass
# Return destination directory path or first file path.
if delegated:
return os.path.join(dest_dir, files_extracted[0])
else:
return dest_dir
def _extract_flashtool(self, file_path, file_header, dest_dir, match):
# Determine embedded ROM start and end offsets.
if match.group(1): # AFUWIN
@@ -2448,7 +2523,7 @@ class UEFIExtractor(Extractor):
return dest_dir_0
class VMExtractor(ArchiveExtractor):
class VMExtractor(PEExtractor):
"""Extract files which must be executed in a virtual machine."""
def __init__(self, *args, **kwargs):
@@ -2490,7 +2565,7 @@ class VMExtractor(ArchiveExtractor):
self._qemu_path = None
break
def extract(self, file_path, file_header, dest_dir, dest_dir_0):
def extract(self, file_path, file_header, dest_dir, dest_dir_0, *, allow_deark=True):
# Stop if QEMU or other dependencies are not available.
if not self._qemu_path:
return False
@@ -2499,19 +2574,19 @@ class VMExtractor(ArchiveExtractor):
# All signatures should be within the first 32 KB or so.
extractor = None
extractor_kwargs = {}
if file_header[:2] == b'MZ' and b'PK\x03\x04' not in file_header: # skip self-extractors with compressed stubs
if file_header[28:32] == b'LZ91':
extractor = self._extract_lzexe
elif file_header[30:36] == b'PKLITE':
# Acquire the multi-file lock. This is required for ROMPAQ below.
self.multifile_lock_acquire(file_path)
if file_header[:2] == b'MZ' and b'PK\x03\x04' not in file_header: # skip ZIP self-extractors with compressed stubs
match = self._floppy_pattern.search(file_header)
if match:
extractor = self._extract_floppy
extractor_kwargs['match'] = match
elif allow_deark: # avoid infinite loops
module = self.extract_deark(file_path, file_header, None)
if module:
# Acquire the multi-file lock if this is a ROMPAQ.EXE.
if os.path.basename(file_path).lower() == 'rompaq.exe':
self.multifile_lock_acquire(file_path)
extractor = self._extract_pklite
else:
match = self._floppy_pattern.search(file_header)
if match:
extractor = self._extract_floppy
extractor_kwargs['match'] = match
extractor = self._extract_deark
elif self._eti_pattern.match(file_header):
extractor = self._extract_eti
elif self._rompaq_pattern.match(file_header):
@@ -2797,56 +2872,26 @@ class VMExtractor(ArchiveExtractor):
else:
return True
def _extract_lzexe(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract LZEXE executables and run them through the same pipeline. This is
required for Siemens Nixdorf FastPacket with its LZEXE-compressed stub."""
def _extract_deark(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract compressed executables with deark and run them through the same pipeline.
This is required for the following self-extractors, which contain a compressed stub:
- Compaq Softpaq (PKLITE)
- NEC in-house (PKLITE)
- Siemens Nixdorf FastPacket (LZEXE)
- Zenith in-house (PKLITE)
The decompressed files cannot be executed (they're garbage), so the pipeline has to run
with the original file path, while file_header gets the decompressed executable's data."""
# Copy original file to the destination directory.
exe_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.exe'
exe_path = os.path.join(dest_dir, exe_name)
try:
shutil.copy2(file_path, exe_path)
except:
return True
# Generate a name for the unpacked file.
unpacked_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.ulz'
unpacked_path = os.path.join(dest_dir, unpacked_name)
# Create batch file for extraction.
bat_path = os.path.join(dest_dir, 'autoexec.bat')
f = open(bat_path, 'wb')
f.write(b'd:\r\nc:unlzexe ' + exe_name.encode('cp437', 'ignore') + b' ' + unpacked_name.encode('cp437', 'ignore') + b'\r\n')
f.close()
# Run QEMU.
self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), vvfat=dest_dir)
# Remove temporary files.
util.remove_all((bat_path, exe_path, os.path.join(dest_dir, exe_name.upper())))
# Stop if unpacking was not successful.
if not os.path.exists(unpacked_path):
unpacked_path = os.path.join(dest_dir, unpacked_name.upper()) # just in case
if not os.path.exists(unpacked_path):
return False
# Strip any remains of LZEXE off the executable to avoid infinite loops.
try:
f = open(unpacked_path, 'r+b')
f.seek(28)
if f.read(4) == b'LZ91':
f.seek(28)
f.write(b'\xFF')
f.close()
except:
pass
# Run deark extractor and stop if it wasn't successful.
unpacked_path = self.extract_deark(file_path, file_header, dest_dir, remove=False, delegated=True)
if type(unpacked_path) != str:
return unpacked_path
# Read unpacked file.
decomp_file_data = util.read_complement(unpacked_path)
# Run this same extractor with detectors pointed at the unpacked data.
ret = self.extract(file_path, decomp_file_data, dest_dir, dest_dir_0)
ret = self.extract(file_path, decomp_file_data, dest_dir, dest_dir_0, allow_deark=False)
# Remove original file.
try:
@@ -2865,95 +2910,6 @@ class VMExtractor(ArchiveExtractor):
return ret
# Keep the unpacked file around for other extractors to process.
try:
shutil.move(exe_path, os.path.join(dest_dir, os.path.basename(file_path)))
except:
pass
return dest_dir
def _extract_pklite(self, file_path, file_header, dest_dir, dest_dir_0):
"""Extract PKLITE executables and run them through the same pipeline. This is required
for the NEC and Zenith in-house floppy extractors with their PKLITE-compressed stubs."""
# Copy original file to the destination directory.
exe_name = util.random_name(8, charset=util.random_name_nosymbols).lower() + '.exe'
exe_path = os.path.join(dest_dir, exe_name)
try:
shutil.copy2(file_path, exe_path)
except:
return True
# Create batch file for extraction.
bat_path = os.path.join(dest_dir, 'autoexec.bat')
f = open(bat_path, 'wb')
f.write(b'd:\r\nc:pklite -x ' + exe_name.encode('cp437', 'ignore') + b'\r\n')
f.close()
# Run QEMU.
self._run_qemu(hdd=os.path.join(self._dep_dir, 'freedos.img'), vvfat=dest_dir)
# PKLITE creates a temporary file and tries to replace the original file with it.
# At least in my Windows-based testing, this triggers an assertion failure in
# vvfat. Therefore, check if the temporary file is present and move it into place.
for tmp_name in ('pktmp$$$.$$$', 'PKTMP$$$.$$$'):
tmp_path = os.path.join(dest_dir, tmp_name)
try:
tmp_size = os.path.getsize(tmp_path)
except:
continue
if tmp_size > 0:
self.debug_print('Using PKLITE temporary file:', tmp_path)
try:
os.remove(exe_path)
except:
pass
try:
shutil.move(tmp_path, exe_path)
except:
self.debug_print('PKLITE temporary file move failed')
exe_path = tmp_path
# Remove batch file.
util.remove_all((bat_path,))
# Strip any remains of PKLITE off the executable to avoid infinite loops.
try:
f = open(exe_path, 'r+b')
f.seek(30)
if f.read(6) == b'PKLITE':
f.seek(30)
f.write(b'\xFF')
f.close()
except:
pass
# Read unpacked file.
decomp_file_data = util.read_complement(exe_path)
# Run this same extractor with detectors pointed at the unpacked data.
ret = self.extract(file_path, decomp_file_data, dest_dir, dest_dir_0)
# Remove original file.
try:
os.remove(file_path)
except:
pass
# Stop if extraction was successful.
if ret:
# Remove unpacked file.
try:
os.remove(exe_path)
except:
pass
return ret
# Keep the unpacked file around for other extractors to process.
try:
shutil.move(exe_path, os.path.join(dest_dir, os.path.basename(file_path)))
except:
pass
return dest_dir
def _extract_rompaq(self, file_path, file_header, dest_dir, dest_dir_0, *, rompaq_path):