Import biostools source

This commit is contained in:
RichardG867
2021-12-15 21:19:32 -03:00
parent fa4467f4ab
commit d5b38bc94a
9 changed files with 4838 additions and 1 deletions

View File

@@ -4,9 +4,10 @@ A toolkit for analyzing and extracting x86 BIOS ROM images (mostly) within the c
## System requirements
* **Linux**. Unfortunately, we rely on tools which contain non-portable code and generate filenames that are invalid for Windows.
* **Linux**. Unfortunately, we rely on tools which contain non-portable code and generate filenames that are invalid for Windows, as well as GNU-specific extensions to shell commands.
* **Python 3.5** or newer.
* **Standard gcc toolchain** for building the essential `bios_extract` tool.
** **7-Zip** command line utility installed as `7z`.
## Installation

17
biostools/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/python3
#
# 86Box A hypervisor and IBM PC system emulator that specializes in
# running old operating systems and software designed for IBM
# PC systems and compatibles from 1981 through fairly recent
# system designs based on the PCI bus.
#
# This file is part of the 86Box BIOS Tools distribution.
#
# Dummy module initialization file.
#
#
#
# Authors: RichardG, <richardg867@gmail.com>
#
# Copyright 2021 RichardG.
#

591
biostools/__main__.py Normal file
View File

@@ -0,0 +1,591 @@
#!/usr/bin/python3 -u
#
# 86Box A hypervisor and IBM PC system emulator that specializes in
# running old operating systems and software designed for IBM
# PC systems and compatibles from 1981 through fairly recent
# system designs based on the PCI bus.
#
# This file is part of the 86Box BIOS Tools distribution.
#
# Main BIOS extractor and analyzer program.
#
#
#
# Authors: RichardG, <richardg867@gmail.com>
#
# Copyright 2021 RichardG.
#
import getopt, os, multiprocessing, re, subprocess, sys
from . import analyzers, extractors, formatters, util
# Constants.
MP_PROCESS_COUNT = 4
ANALYZER_MAX_CACHE_MB = 512
# Extraction module.
def extract_dir(file_extractors, dir_number_path, next_dir_number_path, scan_dir_path, scan_file_names):
"""Process a given directory for extraction."""
# Determine the destination subdirectory.
dest_subdir = scan_dir_path[len(dir_number_path):]
while dest_subdir[:len(os.sep)] == os.sep:
dest_subdir = dest_subdir[len(os.sep):]
# Iterate through files.
for scan_file_name in scan_file_names:
file_path = os.path.join(scan_dir_path, scan_file_name)
# Remove links.
if os.path.islink(file_path):
try:
os.remove(file_path)
except:
try:
os.rmdir(file_path)
except:
pass
continue
# Read header.
try:
f = open(file_path, 'rb')
file_data = f.read(32775) # upper limit set by ISOExtractor
f.close()
except:
# Permission issues or after-the-fact removal of other files by
# extractors can cause this. Give up.
continue
# Come up with a destination directory for this file.
dest_file_path = os.path.join(dest_subdir, scan_file_name + ':')
dest_dir = os.path.join(next_dir_number_path, dest_file_path)
dest_dir_0 = os.path.join(os.path.dirname(next_dir_number_path), '0', dest_file_path)
# Run through file extractors until one succeeds.
for extractor in file_extractors:
# Run the extractor.
try:
extractor_result = extractor.extract(file_path, file_data, dest_dir, dest_dir_0)
except:
# Log an error.
util.log_traceback('extracting', file_path)
continue
# Check if the extractor produced any results.
if extractor_result:
# Handle the line break ourselves, since Python prints the main
# body and line break separately, causing issues when multiple
# threads/processes are printing simultaneously.
print('{0} => {1}{2}\n'.format(file_path, extractor.__class__.__name__, (extractor_result == True) and ' (skipped)' or ''), end='')
break
# Remove destination directories if they were created but are empty.
for to_remove in (dest_dir, dest_dir_0):
util.rmdirs(to_remove)
# Remove this directory if it ends up empty.
util.rmdirs(scan_dir_path)
def extract_process(queue, dir_number_path, next_dir_number_path):
"""Main loop for the extraction multiprocessing pool."""
# Set up extractors.
file_extractors = [
extractors.DiscardExtractor(),
extractors.ISOExtractor(),
extractors.PEExtractor(),
extractors.FATExtractor(),
extractors.TarExtractor(),
extractors.ArchiveExtractor(),
extractors.HexExtractor(),
extractors.ImageExtractor(),
extractors.DellExtractor(),
extractors.IntelExtractor(),
extractors.OMFExtractor(),
extractors.InterleaveExtractor(),
extractors.BIOSExtractor(),
extractors.UEFIExtractor(),
]
# Receive work from the queue.
while True:
item = queue.get()
if item == None: # special item to stop the loop
break
extract_dir(file_extractors, dir_number_path, next_dir_number_path, *item)
def extract(dir_path, _, options):
"""Main function for extraction."""
# Check if the structure is correct.
if not os.path.exists(os.path.join(dir_path, '1')):
print('Incorrect directory structure. All data to unpack should be located inside', file=sys.stderr)
print('a directory named 1 in turn located inside the given directory.', file=sys.stderr)
return 2
# Check if bios_extract is there.
if not os.path.exists(os.path.abspath(os.path.join('bios_extract', 'src', 'bios_extract'))):
print('bios_extract binary not found, did you compile it?', file=sys.stderr)
return 3
# Open devnull file for shell command output.
devnull = open(os.devnull, 'wb')
# Recurse through directory numbers.
dir_number = 1
while True:
dir_number_path = os.path.join(dir_path, str(dir_number))
next_dir_number_path = os.path.join(dir_path, str(dir_number + 1))
# Fix permissions on extracted archives.
print('Fixing up directory {0}:'.format(dir_number), end=' ', flush=True)
try:
print('chown', end=' ', flush=True)
subprocess.run(['chown', '-hR', '--reference=' + dir_path, '--', dir_number_path], stdout=devnull, stderr=subprocess.STDOUT)
print('chmod', end=' ', flush=True)
subprocess.run(['chmod', '-R', 'u+rwx', '--', dir_number_path], stdout=devnull, stderr=subprocess.STDOUT) # execute for listing directories
except:
pass
print()
# Start multiprocessing pool.
print('Starting extraction on directory {0}'.format(dir_number), end='', flush=True)
queue = multiprocessing.Queue(maxsize=MP_PROCESS_COUNT)
mp_pool = multiprocessing.Pool(MP_PROCESS_COUNT, initializer=extract_process, initargs=(queue, dir_number_path, next_dir_number_path))
# Create next directory.
if not os.path.isdir(next_dir_number_path):
os.makedirs(next_dir_number_path)
# Scan directory structure. I really wanted this to have file-level
# granularity, but IntelExtractor and InterleaveBIOSExtractor
# both require directory-level granularity for inspecting other files.
print(flush=True)
found_any_files = False
for scan_dir_path, scan_dir_names, scan_file_names in os.walk(dir_number_path):
if len(scan_file_names) > 0:
found_any_files = True
queue.put((scan_dir_path, scan_file_names))
# Stop if no files are left.
if not found_any_files:
# Remove this directory and the directory if they're empty.
try:
os.rmdir(dir_number_path)
dir_number -= 1
except:
pass
try:
os.rmdir(next_dir_number_path)
except:
pass
break
# Increase number.
dir_number += 1
# Stop multiprocessing pool and wait for its workers to finish.
for _ in range(MP_PROCESS_COUNT):
queue.put(None)
mp_pool.close()
mp_pool.join()
# Create 0 directory if it doesn't exist.
print('Merging directories:', end=' ')
merge_dest_path = os.path.join(dir_path, '0')
if not os.path.isdir(merge_dest_path):
os.makedirs(merge_dest_path)
# Merge all directories into the 0 directory.
for merge_dir_name in range(1, dir_number + 1):
merge_dir_path = os.path.join(dir_path, str(merge_dir_name))
if not os.path.isdir(merge_dir_path):
continue
print(merge_dir_name, end=' ')
subprocess.run(['cp', '-rlaT', merge_dir_path, merge_dest_path], stdout=devnull, stderr=subprocess.STDOUT)
subprocess.Popen(['rm', '-rf', merge_dir_path], stdout=devnull, stderr=subprocess.STDOUT)
# Clean up.
devnull.close()
print()
return 0
# Analysis module.
def analyze_dir(formatter, scan_base, file_analyzers, scan_dir_path, scan_file_names):
"""Process a given directory for analysis."""
# Sort file names for better predictability.
scan_file_names.sort()
# Set up caches.
files_flags = {}
files_data = {}
combined_oroms = []
header_data = None
# In combined mode (enabled by InterleaveExtractor and BIOSExtractor), we
# handle all files in the directory as a single large blob, to avoid any doubts.
combined = ':combined:' in scan_file_names
if combined:
files_data[''] = b''
# Read files into the cache.
cache_quota = ANALYZER_MAX_CACHE_MB * 1073741824
for scan_file_name in scan_file_names:
# Skip known red herrings. This check is legacy code with an unknown impact.
scan_file_name_lower = scan_file_name.lower()
if 'post.string' in scan_file_name_lower or 'poststr.rom' in scan_file_name_lower:
continue
# Read up to 16 MB as a safety net.
file_data = util.read_complement(os.path.join(scan_dir_path, scan_file_name))
# Write data to cache.
if scan_file_name == ':header:':
header_data = file_data
elif combined:
files_data[''] += file_data
# Add PCI option ROM IDs extracted from AMI BIOSes by bios_extract, since the ROM might not
# contain a valid PCI header to begin with. (Apple PC Card with OPTi Viper and AMIBIOS 6)
match = re.match('''amipci_([0-9a-f]{4})_([0-9a-f]{4})\.rom$''', scan_file_name_lower)
if match:
combined_oroms.append((int(match.group(1), 16), int(match.group(2), 16)))
else:
files_data[scan_file_name] = file_data
# Stop reading if the cache has gotten too big.
cache_quota -= len(file_data)
if cache_quota <= 0:
break
# Prepare combined-mode analysis.
if combined:
# Set interleaved flag on de-interleaved blobs.
if scan_file_names == [':combined:', 'deinterleaved_a.bin', 'deinterleaved_b.bin', 'interleaved_a.bin', 'interleaved_b.bin']:
combined = 'interleaved'
# Commit to only analyzing the large blob.
scan_file_names = ['']
elif header_data:
# Remove header flag file from list.
scan_file_names.remove(':header:')
# Analyze each file.
for scan_file_name in scan_file_names:
# Read file from cache if possible.
scan_file_path = os.path.join(scan_dir_path, scan_file_name)
file_data = files_data.get(scan_file_name, None)
if file_data == None:
# Read up to 16 MB as a safety net.
file_data = util.read_complement(scan_file_path)
# Check for an analyzer which can handle this file.
bonus_analyzer_addons = bonus_analyzer_oroms = None
file_analyzer = None
strings = None
for analyzer in file_analyzers:
# Reset this analyzer.
analyzer.reset()
analyzer._file_path = scan_file_path
# Check if the analyzer can handle this file.
try:
analyzer_result = analyzer.can_handle(file_data, header_data)
except:
# Log an error.
util.log_traceback('searching for analyzers for', os.path.join(scan_dir_path, scan_file_name))
continue
# Move on if the analyzer responded negatively.
if not analyzer_result:
# Extract add-ons and option ROMs from the bonus analyzer.
if bonus_analyzer_addons == None:
bonus_analyzer_addons = analyzer.addons
bonus_analyzer_oroms = analyzer.oroms
continue
# Run strings on the file data if required (only once).
if not strings:
try:
strings = subprocess.run(['strings', '-n8'], input=file_data, stdout=subprocess.PIPE).stdout.decode('ascii', 'ignore').split('\n')
except:
util.log_traceback('running strings on', os.path.join(scan_dir_path, scan_file_name))
continue
# Analyze each string.
try:
for string in strings:
analyzer.analyze_line(string)
except analyzers.AbortAnalysisError:
# Analysis aborted.
pass
except:
# Log an error.
util.log_traceback('analyzing', os.path.join(scan_dir_path, scan_file_name))
continue
# Take this analyzer if it produced a version.
if analyzer.version:
# Clean up version field if an unknown version was returned.
if analyzer.version == '?':
analyzer.version = ''
# Stop looking for analyzers.
file_analyzer = analyzer
break
# Did any analyzer successfully handle this file?
if not file_analyzer:
# Treat this as a standalone PCI option ROM file if BonusAnalyzer found any.
if bonus_analyzer_oroms:
bonus_analyzer_addons = []
file_analyzer = file_analyzers[0]
else:
# Move on to the next file if nothing else.
continue
# Add interleaved flag to add-ons.
if combined == 'interleaved':
bonus_analyzer_addons.append('Interleaved')
# Clean up the file path.
scan_file_path_full = os.path.join(scan_dir_path, scan_file_name)
# Remove combined directories.
found_flag_file = True
while found_flag_file:
# Find archive indicator.
archive_idx = scan_file_path_full.rfind(':' + os.sep)
if archive_idx == -1:
break
# Check if a combined or header flag file exists.
found_flag_file = False
for flag_file in (':combined:', ':header:'):
if os.path.exists(os.path.join(scan_file_path_full[:archive_idx] + ':', flag_file)):
# Trim the directory off.
scan_file_path_full = scan_file_path_full[:archive_idx]
found_flag_file = True
break
scan_file_path = scan_file_path_full[len(scan_base) + len(os.sep):]
# Remove root extraction directory.
slash_index = scan_file_path.find(os.sep)
if slash_index == 1 and scan_file_path[0] == '0':
scan_file_path = scan_file_path[2:]
# De-duplicate and sort add-ons and option ROMs.
addons = list(set(addon.strip() for addon in (analyzer.addons + bonus_analyzer_addons)))
addons.sort()
oroms = list(set(combined_oroms + analyzer.oroms + bonus_analyzer_oroms))
oroms.sort()
# Add names to option ROMs.
previous_vendor = previous_device = None
for x in range(len(oroms)):
# Get vendor and device IDs and names.
vendor_id, device_id = oroms[x]
vendor, device = util.get_pci_id(vendor_id, device_id)
# Skip valid vendor IDs associated to a bogus device ID.
if device == '[Unknown]' and device_id == 0x0000:
oroms[x] = None
continue
# Clean up IDs.
vendor = util.clean_vendor(vendor).strip()
device = util.clean_device(device, vendor).strip()
# De-duplicate vendor names.
if vendor == previous_vendor and vendor != '[Unknown]':
if device == previous_device:
previous_device, device = device, ''
previous_vendor, vendor = vendor, '\u2196' # up-left arrow
else:
previous_device = device
previous_vendor, vendor = vendor, ' ' * len(vendor)
else:
previous_device = device
previous_vendor = vendor
# Format string.
oroms[x] = '[{0:04x}:{1:04x}] {2} {3}'.format(vendor_id, device_id, vendor, device)
# Remove bogus option ROM device ID entries.
while None in oroms:
oroms.remove(None)
# Collect the analyzer's results.
fields = [((type(field) == str) and field.replace('\t', ' ').strip(' \n') or field) for field in [
scan_file_path,
file_analyzer.vendor,
file_analyzer.version,
formatter.split_if_required('\n', file_analyzer.string),
formatter.split_if_required('\n', file_analyzer.signon),
formatter.join_if_required(' ', addons),
formatter.join_if_required('\n', oroms),
]]
# Output the results.
formatter.output_row(fields)
def analyze_process(queue, formatter, scan_base):
"""Main loop for the analysis multiprocessing pool."""
# Set up analyzers.
file_analyzers = [
analyzers.BonusAnalyzer(), # must be the first one
analyzers.AwardPowerAnalyzer(), # must run before AwardAnalyzer
analyzers.ToshibaAnalyzer(), # must run before AwardAnalyzer
analyzers.AwardAnalyzer(), # must run before PhoenixAnalyzer
analyzers.QuadtelAnalyzer(), # must run before PhoenixAnalyzer
analyzers.PhoenixAnalyzer(), # must run before AMIDellAnalyzer and AMIIntelAnalyzer
#analyzers.AMIDellAnalyzer(), # must run before AMIAnalyzer
analyzers.AMIUEFIAnalyzer(), # must run before AMIAnalyzer
analyzers.AMIAnalyzer(), # must run before AMIIntelAnalyzer
analyzers.AMIIntelAnalyzer(),
analyzers.MRAnalyzer(),
# less common BIOSes with no dependencies on the common part begin here #
analyzers.AcerAnalyzer(),
analyzers.AmstradAnalyzer(),
analyzers.CDIAnalyzer(),
analyzers.CentralPointAnalyzer(),
analyzers.ChipsAnalyzer(),
analyzers.CommodoreAnalyzer(),
analyzers.CompaqAnalyzer(),
analyzers.CorebootAnalyzer(),
analyzers.DTKGoldStarAnalyzer(),
analyzers.GeneralSoftwareAnalyzer(),
analyzers.IBMAnalyzer(),
analyzers.InsydeAnalyzer(),
analyzers.IntelUEFIAnalyzer(),
analyzers.JukoAnalyzer(),
analyzers.MRAnalyzer(),
analyzers.OlivettiAnalyzer(),
analyzers.SchneiderAnalyzer(),
analyzers.SystemSoftAnalyzer(),
analyzers.TandonAnalyzer(),
analyzers.TinyBIOSAnalyzer(),
analyzers.WhizproAnalyzer(),
analyzers.ZenithAnalyzer(),
]
# Receive work from the queue.
while True:
item = queue.get()
if item == None: # special item to stop the loop
break
analyze_dir(formatter, scan_base, file_analyzers, *item)
def analyze(dir_path, formatter_args, options):
"""Main function for analysis."""
# Initialize output formatter.
output_formats = {
'csv': (formatters.XSVFormatter, ','),
'scsv': (formatters.XSVFormatter, ';'),
'json': formatters.JSONObjectFormatter,
'jsontable': formatters.JSONTableFormatter,
}
formatter = output_formats.get(options['format'], None)
if not formatter:
raise Exception('unknown output format ' + options['format'])
if type(formatter) == tuple:
formatter = formatter[0](*formatter[1:], sys.stdout, options, formatter_args)
else:
formatter = formatter(sys.stdout, options, formatter_args)
# Begin output.
formatter.begin()
formatter.output_headers(['File', 'Vendor', 'Version', 'String', 'Sign-on', 'Add-ons', 'PCI ROMs'], options.get('headers'))
# Remove any trailing slash from the root path, as the output path cleanup
# functions rely on it not being present.
if dir_path[-len(os.sep):] == os.sep:
dir_path = dir_path[:-len(os.sep)]
elif dir_path[-1:] == '/':
dir_path = dir_path[:-1]
# Start multiprocessing pool.
queue = multiprocessing.Queue(maxsize=MP_PROCESS_COUNT)
mp_pool = multiprocessing.Pool(MP_PROCESS_COUNT, initializer=analyze_process, initargs=(queue, formatter, dir_path))
# Scan directory structure.
for scan_dir_path, scan_dir_names, scan_file_names in os.walk(dir_path):
queue.put((scan_dir_path, scan_file_names))
# Stop multiprocessing pool and wait for its workers to finish.
for _ in range(MP_PROCESS_COUNT):
queue.put(None)
mp_pool.close()
mp_pool.join()
# End output.
formatter.end()
return 0
def main():
mode = None
options = {
'array': False,
'format': 'csv',
'headers': True,
'hyperlink': False,
}
args, remainder = getopt.getopt(sys.argv[1:], 'xaf:hnr', ['extract', 'analyze', 'format=', 'hyperlink', 'no-headers', 'array'])
for opt, arg in args:
if opt in ('-x', '--extract'):
mode = 'extract'
elif opt in ('-a', '--analyze'):
mode = 'analyze'
elif opt in ('-f', '--format'):
options['format'] = arg.lower()
elif opt in ('-h', '--hyperlink'):
options['hyperlink'] = True
elif opt in ('-n', '--no-headers'):
options['headers'] = False
elif opt in ('-r', '--array'):
options['array'] = True
if len(remainder) > 0:
if mode == 'extract':
return extract(remainder[0], remainder[1:], options)
elif mode == 'analyze':
return analyze(remainder[0], remainder[1:], options)
usage = '''
Usage: python3 -m biostools -x directory
python3 -m biostools [-f output_format] [-h] [-n] [-r] -a directory [formatter_options]
-x Extract archives and BIOS images recursively in the given directory
-a Analyze extracted BIOS images in the given directory
-f Output format:
csv Comma-separated values with quotes (default)
scsv Semicolon-separated values with quotes
json JSON object array
jsontable JSON table
-h Generate download links for file paths representing HTTP URLs.
csv/scsv: The Excel HYPERLINK formula is used; if you have
non-English Excel, you must provide your language's
HYPERLINK formula name in formatter_options.
-n csv/scsv/jsontable: Don't output column headers.
-r json/jsontable: Output multi-value cells as arrays.
'''
print(usage, file=sys.stderr)
return 1
if __name__ == '__main__':
sys.exit(main())

2320
biostools/analyzers.py Normal file

File diff suppressed because it is too large Load Diff

1312
biostools/extractors.py Normal file

File diff suppressed because it is too large Load Diff

199
biostools/formatters.py Normal file
View File

@@ -0,0 +1,199 @@
#!/usr/bin/python3
#
# 86Box A hypervisor and IBM PC system emulator that specializes in
# running old operating systems and software designed for IBM
# PC systems and compatibles from 1981 through fairly recent
# system designs based on the PCI bus.
#
# This file is part of the 86Box BIOS Tools distribution.
#
# Data output formatting classes.
#
#
#
# Authors: RichardG, <richardg867@gmail.com>
#
# Copyright 2021 RichardG.
#
import json, os, re
class Formatter:
def __init__(self, out_file, options, args):
"""Initialize a formatter with the given output file and options."""
self.out_file = out_file
self.options = options
self.args = args
self.array = options.get('array')
def begin(self):
"""Begin the formatter's output."""
pass
def end(self):
"""End the formatter's output."""
pass
def get_url(self, columns):
"""Returns the download URL for a given row."""
# Start building the URL.
link_url = columns[0]
# Remove www from original path.
if columns[0][:4] == 'www.':
columns[0] = columns[0][4:]
# Make sure the components are slash-separated.
if os.sep != '/':
link_url = link_url.replace(os.sep, '/')
# Stop at the first decompression layer.
archive_index = link_url.find(':/')
if archive_index > -1:
link_url = link_url[:archive_index]
# Encode the URL.
link_url = link_url.replace('#', '%23')
link_url = re.sub('''\?(^[/]*)/''', '%3F\\1/', link_url)
# Stop if the URL is not valid.
slash_index = link_url.find('/')
if slash_index == -1 or '.' not in link_url[:slash_index]:
return ''
return 'http://' + link_url
def join_if_required(self, c, l):
"""Returns just l if array mode is enabled, or l joined by c otherwise."""
if self.array:
return l
else:
return c.join(l)
def output_headers(self, columns, do_output):
"""Output column headers."""
if do_output:
self.output_row(columns)
def output_row(self, columns):
"""Output an item."""
raise NotImplementedError()
def split_if_required(self, c, s):
"""Returns s split by c if array mode is enabled, or just s otherwise."""
if self.array:
return s.split(c)
else:
return s
class XSVFormatter(Formatter):
def __init__(self, delimiter, *args, **kwargs):
super().__init__(*args, **kwargs)
# Not supported here.
self.array = False
self.delimiter = delimiter
if self.options.get('hyperlink'):
# Get the localized HYPERLINK formula name if specified.
if self.args:
self.hyperlink = self.args[0]
else:
self.hyperlink = 'HYPERLINK'
else:
self.hyperlink = None
def output_row(self, columns):
# Add hyperlink if requested.
output = ''
if self.hyperlink:
link_url = self.get_url(columns)
if link_url:
link_prefix = '=' + self.hyperlink + '(""'
link_suffix = '""' + self.delimiter + '""\U0001F53D"")' # down arrow emoji
# Build and output the final link, accounting for Excel's column size limit.
link = link_prefix + link_url[:256 - len(link_prefix) - len(link_suffix)] + link_suffix
output += '"' + link + '"'
else:
output += '""'
# Add fields.
for field in columns:
if output:
output += self.delimiter
output += '"'
# Account for Excel's column size limit and lack of linebreak support.
output += field.replace('\n', ' - ').replace('"', '""')[:256]
output += '"'
# Add linebreak.
output += '\n'
# Write row.
self.out_file.write(output)
class JSONFormatter(Formatter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.hyperlink = self.options.get('hyperlink')
def begin(self):
# Start root list.
self.out_file.write('[')
self.first_row = True
def end(self):
# End root list.
self.out_file.write(']\n')
def get_json_object(self, columns):
"""Returns the JSON object to be output for this row."""
raise NotImplementedError()
def output_headers(self, columns, do_output):
# Insert URL column if requested.
hyperlink = self.hyperlink
if hyperlink:
columns.insert(0, 'URL')
# Prevent output_row from adding a null header.
self.hyperlink = False
super().output_headers(columns, do_output)
self.hyperlink = hyperlink
def output_row(self, columns):
# Add URL if requested.
if self.hyperlink:
columns.insert(0, self.get_url(columns))
# Write row.
obj = self.get_json_object(columns)
if obj:
if self.first_row:
self.first_row = False
else:
self.out_file.write('\n,')
self.out_file.write(json.dumps(obj))
class JSONObjectFormatter(JSONFormatter):
def get_json_object(self, columns):
return {self.headers[column_index]: columns[column_index] for column_index in range(len(columns)) if columns[column_index]}
def output_headers(self, columns, do_output):
# Insert URL column if requested.
if self.hyperlink:
columns.insert(0, 'URL')
# Save column headers for later.
self.headers = [column.lower().replace(' ', '').replace('-', '') for column in columns]
class JSONTableFormatter(JSONFormatter):
def get_json_object(self, columns):
return columns

257
biostools/pciutil.py Normal file
View File

@@ -0,0 +1,257 @@
#!/usr/bin/python3
#
# 86Box A hypervisor and IBM PC system emulator that specializes in
# running old operating systems and software designed for IBM
# PC systems and compatibles from 1981 through fairly recent
# system designs based on the PCI bus.
#
# This file is part of the 86Box BIOS Tools distribution.
#
# Utility library for identifying PCI device/vendor IDs.
#
#
#
# Authors: RichardG, <richardg867@gmail.com>
#
# Copyright 2021 RichardG.
#
import io, re, urllib.request
clean_device_abbr = [
# Generic patterns to catch extended abbreviations: "Abbreviated Terms (AT)"
('([A-Z])[^\s]+ ([A-Z])[^\s]+ (?:\(|\[|\{|/)\\2\\3(?:$|\)|\]|\})', '\\2\\3'),
('([A-Z])[^\s]+ ([A-Z])[^\s]+ ([A-Z])[^\s]+ (?:\(|\[|\{|/)\\2\\3\\4(?:$|\)|\]|\})', '\\2\\3\\4'),
('([A-Z])[^\s]+ ([A-Z])[^\s]+ ([A-Z])[^\s]+ ([A-Z])[^\s]+ (?:\(|\[|\{|/)\\2\\3\\4\\5(?:$|\)|\]|\})', '\\2\\3\\4\\5'),
# Manual patterns
('100Base-TX?', 'FE'),
('1000Base-T', 'GbE'),
('Accelerat(?:ion|or)', 'Accel.'),
('Alert on LAN', 'AoL'),
('\((.+) applications?\)', '(\\2)'), # 8086:105e
('Chipset Family', 'Chipset'),
('Chipset Graphics', 'iGPU'),
('Connection', 'Conn.'),
('DECchip', ''),
('Dual (Lane|Port)', '2-\\2'),
('Fast Ethernet', 'FE'),
('Fibre Channel', 'FC'),
('Function', 'Func.'),
('([0-9]{1,3})G Ethernet', '\\2GbE'),
('(?:([0-9]{1,3}) ?)?(?:G(?:bit|ig) Ethernet|GbE)', '\\2GbE'),
('Graphics Processor', 'GPU'),
('High Definition Audio', 'HDA'),
('Host Adapter', 'HBA'),
('Host Bus Adapter', 'HBA'),
('Host Controller', 'HC'),
('Input/Output', 'I/O'),
('Integrated ([^\s]+) (?:Graphics|GPU)', '\\2 iGPU'), # VIA CLE266
('Integrated (?:Graphics|GPU)', 'iGPU'),
('([0-9]) (lane|port)', '\\2-\\3'),
('Local Area Network', 'LAN'),
('Low Pin Count', 'LPC'),
('Memory Controller Hub', 'MCH'),
('Network (?:Interface )?(?:Adapter|Card|Controller)', 'NIC'),
('NVM Express', 'NVMe'),
('Parallel ATA', 'PATA'),
('PCI(?:-E| Express)', 'PCIe'),
('([^- ]+)[- ]to[- ]([^- ]+)', '\\2-\\3'),
('Platform Controller Hub', 'PCH'),
('Processor Graphics', 'iGPU'),
('Quad (Lane|Port)', '4-\\2'),
('Serial ATA', 'SATA'),
('Serial Attached SCSI', 'SAS'),
('Single (Lane|Port)', '1-\\2'),
('USB ?([0-9])\\.0', 'USB\\2'),
('USB ?([0-9])\\.[0-9] ?Gen([0-9x]+)', 'USB\\2.\\3'),
('USB ?([0-9]\\.[0-9])', 'USB\\2'),
('Virtual Machine', 'VM'),
('Wake on LAN', 'WoL'),
('Wireless LAN', 'WLAN'),
# Generic pattern to remove duplicate abbreviations: "AT (AT)"
('([^ \(\[\{/]+) (?: |\(|\[|\{|/)\\2(?: |\)|\]|\})', '\\2'),
]
clean_device_bit_pattern = re.compile('''( |^|\(|\[|\{|/)(?:([0-9]{1,4}) )?(?:(K)(?:ilo)?|(M)(?:ega)?|(G)(?:iga)?)bit( |$|\)|\]|\})''', re.I)
clean_device_suffix_pattern = re.compile(''' (?:Adapter|Card|Device|(?:Host )?Controller)( (?: [0-9#]+)?|$|\)|\]|\})''', re.I)
clean_vendor_abbr_pattern = re.compile(''' \[([^\]]+)\]''')
clean_vendor_suffix_pattern = re.compile(''' (?:Semiconductors?|(?:Micro)?electronics?|Interactive|Technolog(?:y|ies)|(?:Micro)?systems|Computer(?: works)?|Products|Group|and subsidiaries|of(?: America)?|Co(?:rp(?:oration)?|mpany)?|Inc|LLC|Ltd|GmbH|AB|AG|SA|(?:\(|\[|\{).*)$''', re.I)
clean_vendor_force = {
'National Semiconductor Corporation': 'NSC',
}
clean_vendor_final = {
'Chips and': 'C&T',
'Digital Equipment': 'DEC',
'Microchip Technology/SMSC': 'Microchip/SMSC',
'NVidia/SGS Thomson': 'NVIDIA/ST',
'S3 Graphics': 'S3',
'Silicon Integrated': 'SiS',
'Silicon Motion': 'SMI',
'STMicroelectronics': 'ST',
'Texas Instruments': 'TI',
'VMWare': 'VMware',
}
_clean_device_abbr_cache = []
_pci_vendors = {}
_pci_devices = {}
_pci_subdevices = {}
_pci_classes = {}
_pci_subclasses = {}
_pci_progifs = {}
def clean_device(device, vendor=None):
"""Make a device name more compact if possible."""
# Generate pattern cache if required.
if not _clean_device_abbr_cache:
for pattern, replace in clean_device_abbr:
_clean_device_abbr_cache.append((
re.compile('''(?P<prefix> |^|\(|\[|\{|/)''' + pattern + '''(?P<suffix> |$|\)|\]|\})''', re.I),
'\\g<prefix>' + replace + '\\g<suffix>',
))
# Apply patterns.
device = clean_device_bit_pattern.sub('\\1\\2\\3\\4\\5bit\\6', device)
for pattern, replace in _clean_device_abbr_cache:
device = pattern.sub(replace, device)
device = clean_device_suffix_pattern.sub('\\1', device)
# Remove duplicate vendor ID.
if vendor and device[:len(vendor)] == vendor:
device = device[len(vendor):]
# Remove duplicate spaces.
return ' '.join(device.split())
def clean_vendor(vendor):
"""Make a vendor name more compact if possible."""
# Apply force table.
vendor_force = clean_vendor_force.get(vendor, None)
if vendor_force:
return vendor_force
# Use an abbreviation if the name already includes it.
vendor = vendor.replace(' / ', '/')
match = clean_vendor_abbr_pattern.search(vendor)
if match:
return match.group(1)
# Apply patterns.
match = True
while match:
vendor = vendor.rstrip(' ,.')
match = clean_vendor_suffix_pattern.search(vendor)
if match:
vendor = vendor[:match.start()]
# Apply final cleanup table.
vendor = clean_vendor_final.get(vendor, vendor)
# Remove duplicate spaces.
return ' '.join(vendor.split())
def download_compressed(url, skip_exts=[]):
"""Downloads a file which may be available in compressed versions."""
# Try all files.
for ext, module_name in (('.xz', 'lzma'), ('.bz2', 'bz2'), ('.gz', 'gzip'), (None, None)):
# Skip extension if requested.
if ext in skip_exts:
continue
# Import decompression module if required.
if module_name:
try:
module = __import__(module_name)
except:
continue
# Connect to URL.
try:
f = urllib.request.urlopen(url + (ext or ''), timeout=30)
except:
# Move on to the next file if the connection failed.
continue
# If this is uncompressed, return the file handle as is.
if not module_name:
return f
# Decompress data into a BytesIO object.
try:
return io.BytesIO(module.decompress(f.read()))
except:
# Move on to the next file if decompression failed.
continue
# No success with any files.
raise FileNotFoundError('All attempts to download "{0}" and variants thereof have failed'.format(url))
def get_pci_id(vendor_id, device_id):
"""Get the PCI device vendor and name for vendor_id and device_id."""
# Load PCI ID database if required.
if not _pci_vendors:
load_pci_db()
# Get identification.
vendor = _pci_vendors.get(vendor_id, '').strip()
return vendor or '[Unknown]', _pci_devices.get((vendor_id << 16) | device_id, vendor and '[Unknown]' or '').strip()
def load_pci_db():
"""Loads PCI ID database from disk or the website."""
# Try loading from disk or the website.
try:
f = open('/usr/share/misc/pci.ids', 'rb')
except:
try:
f = download_compressed('https://pci-ids.ucw.cz/v2.2/pci.ids', ['.xz'])
except:
# No sources available.
return
vendor = 0
class_num = subclass_num = None
for line in f:
if len(line) < 2 or line[0] == 35:
continue
elif line[0] == 67: # class
class_num = int(line[2:4], 16)
_pci_classes[class_num] = line[6:-1].decode('utf8', 'ignore')
elif class_num != None: # subclass/progif
if line[1] != 9: # subclass
subclass_num = (class_num << 8) | int(line[1:3], 16)
_pci_subclasses[subclass_num] = line[5:-1].decode('utf8', 'ignore')
else: # progif
progif_num = (subclass_num << 8) | int(line[2:4], 16)
_pci_progifs[progif_num] = line[6:-1].decode('utf8', 'ignore')
elif line[0] != 9: # vendor
vendor = int(line[:4], 16)
_pci_vendors[vendor] = line[6:-1].decode('utf8', 'ignore')
elif line[1] != 9: # device
device = (vendor << 16) | int(line[1:5], 16)
_pci_devices[device] = line[7:-1].decode('utf8', 'ignore')
else: # subdevice
subdevice = (int(line[2:6], 16) << 16) | int(line[7:11], 16)
if device not in _pci_subdevices:
_pci_subdevices[device] = {}
_pci_subdevices[device][subdevice] = line[13:-1].decode('utf8', 'ignore')
f.close()
# Debugging feature.
if __name__ == '__main__':
s = input()
try:
if len(s) in (8, 9):
vendor, device = get_pci_id(int(s[:4], 16), int(s[-4:], 16))
vendor = clean_vendor(vendor)
print(vendor)
print(clean_device(device, vendor))
else:
raise Exception('not id')
except:
print(clean_device(s))

139
biostools/util.py Normal file
View File

@@ -0,0 +1,139 @@
#!/usr/bin/python3
#
# 86Box A hypervisor and IBM PC system emulator that specializes in
# running old operating systems and software designed for IBM
# PC systems and compatibles from 1981 through fairly recent
# system designs based on the PCI bus.
#
# This file is part of the 86Box BIOS Tools distribution.
#
# Utility functions.
#
#
#
# Authors: RichardG, <richardg867@gmail.com>
#
# Copyright 2021 RichardG.
#
import multiprocessing, os, re, traceback, urllib.request
from biostools.pciutil import *
date_pattern_mmddyy = re.compile('''(?P<month>[0-9]{2})/(?P<day>[0-9]{2})/(?P<year>[0-9]{2,4})''')
_error_log_lock = multiprocessing.Lock()
def all_match(patterns, data):
"""Returns True if all re patterns can be found in data."""
# Python is smart enough to stop generation when a None is found.
return None not in (pattern.search(data) for pattern in patterns)
def date_gt(date1, date2, pattern):
"""Returns True if date1 is greater than date2.
Date format set by the given pattern."""
# Run date regex.
date1_match = pattern.match(date1)
date2_match = pattern.match(date2)
if date1_match:
if not date2_match:
return True
else:
return False
# Extract year, month and day.
date1_year = int(date1_match.group('year'))
date1_month = int(date1_match.group('month'))
date1_day = int(date1_match.group('day'))
date2_year = int(date2_match.group('year'))
date2_month = int(date2_match.group('month'))
date2_day = int(date2_match.group('day'))
# Add century to two-digit years.
if date1_year < 100:
if date1_year < 80:
date1_year += 2000
else:
date1_year += 1900
if date2_year < 100:
if date2_year < 80:
date2_year += 2000
else:
date2_year += 1900
# Perform the comparisons.
if date1_year != date2_year:
return date1_year > date2_year
elif date1_month != date2_month:
return date1_month > date2_month
elif date1_day != date2_day:
return date1_day > date2_day
else:
return False
def log_traceback(*args):
"""Log to biostools_error.log, including any outstanding traceback."""
elems = ['===[ While']
for elem in args:
elems.append(str(elem))
elems.append(']===\n')
output = ' '.join(elems)
with _error_log_lock:
f = open('biostools_error.log', 'a')
f.write(output)
traceback.print_exc(file=f)
f.close()
def read_complement(file_path, file_header=None, max_size=16777216):
"""Read up to max_size from file_path starting at the end of file_header.
Usage: file_header += read_complement(file_path, file_header)"""
try:
f = open(file_path, 'rb')
if file_header:
f.seek(len(file_header))
ret = f.read(max_size - len(file_header))
else:
ret = f.read(max_size)
f.close()
return ret
except:
return b''
def read_string(data, terminator=b'\x00'):
"""Read a terminated string (by NUL by default) from a bytes."""
terminator_index = data.find(terminator)
if terminator_index > -1:
data = data[:terminator_index]
return data.decode('cp437', 'ignore')
def rmdirs(dir_path):
"""Remove empty dir_path, also removing any parent directory which ends up empty."""
removed_count = 0
while True:
try:
os.rmdir(dir_path)
removed_count += 1
dir_path = os.path.dirname(dir_path)
except OSError:
break
except:
continue
return removed_count
def remove_extension(file_name):
"""Remove file_name's extension, if one is present."""
extension_index = file_name.rfind('.')
if extension_index > -1:
return file_name[:extension_index]
else:
return file_name
def try_makedirs(dir_path):
"""Try to create dir_path. Returns True if successful, False if not."""
try:
os.makedirs(dir_path)
except:
pass
return os.path.isdir(dir_path)

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
Pillow>=8