bruteforce: Add zlib decompression methods

This commit is contained in:
RichardG867
2025-05-12 11:50:37 -03:00
parent f971f6222f
commit df2735934e

View File

@@ -1,6 +1,6 @@
import getopt, os, queue, subprocess, sys, threading, time
import getopt, itertools, os, queue, subprocess, sys, threading, time, zlib
algos = ['lh5', 'lzari', 'lzh', 'notlzss', 'notlzari', 'notlzh']
algos = ['lh5', 'lzari', 'lzh', 'notlzss', 'notlzari', 'notlzh'] + ['zl' + str(x) for x in itertools.chain(range(8, 16), [0], range(-15, -7), range(24, 32), range(40, 48))]
longest_algo = 0
thread_status = []
term_size = os.get_terminal_size()
@@ -55,6 +55,8 @@ def work_thread(thread_id, options, q):
thread_status[thread_id] = 'init'
# Read file into buffer 0.
with open(options['file_path'], 'rb') as f:
data = f.read()
proc.stdin.write('0;0\n{0}\n'.format(options['file_path']).encode('utf8', 'ignore'))
proc.stdin.flush()
@@ -81,61 +83,76 @@ def work_thread(thread_id, options, q):
return
offset, algo = item
is_zl = algos[algo][:2] == 'zl'
# Request decompression.
proc.stdin.write('3;0;{0};{1};1;0;{2};{3}\n'.format(offset, file_size - offset, options['decomp_size'], algo).encode('utf8', 'ignore'))
proc.stdin.flush()
if not is_zl:
proc.stdin.write('3;0;{0};{1};1;0;{2};{3}\n'.format(offset, file_size - offset, options['decomp_size'], algo).encode('utf8', 'ignore'))
proc.stdin.flush()
# Set thread status.
thread_status[thread_id] = algos[algo].ljust(longest_algo) + ' ' + hex(offset)[2:]
# Read result.
try:
decompressed = proc.stdout.readline()
except:
decompressed = b''
if decompressed:
decompressed = int(decompressed.strip())
if is_zl:
try:
decompressed_data = zlib.decompress(data[offset:], wbits=int(algos[algo][2:]))
decompressed = len(decompressed_data)
except:
decompressed_data = b''
decompressed = 0
else:
try:
proc.kill()
decompressed = proc.stdout.readline()
except:
pass
break
decompressed = b''
if decompressed:
decompressed = int(decompressed.strip())
else:
try:
proc.kill()
except:
pass
break
# Check for success.
found = None
if options['byte_search']:
# Request byte search.
proc.stdin.write('4;1;0;{0};{1}\n'.format(options['check_length'], len(options['byte_search'])).encode('utf8', 'ignore'))
proc.stdin.write(options['byte_search'])
proc.stdin.flush()
if is_zl:
found_offset = decompressed_data[:options['check_length']].find(options['byte_search'])
else:
# Request byte search.
proc.stdin.write('4;1;0;{0};{1}\n'.format(options['check_length'], len(options['byte_search'])).encode('utf8', 'ignore'))
proc.stdin.write(options['byte_search'])
proc.stdin.flush()
# Read result.
found_offset = int(proc.stdout.readline().strip())
# Read result.
found_offset = int(proc.stdout.readline().strip())
# Print result.
if found_offset > -1:
found = '{0}+{1}'.format(hex(offset)[2:], hex(found_offset)[2:])
elif decompressed >= options['check_length']:
found = hex(offset)[2:]
if found:
# Request buffer read.
if decompressed > 0:
request_size = decompressed
else:
request_size = buf_size
proc.stdin.write('2;1;0;{0}\n'.format(request_size).encode('utf8', 'ignore'))
proc.stdin.flush()
with open(output_base + '.' + found, 'wb') as f:
if is_zl:
f.write(decompressed_data)
else:
# Request buffer read.
if decompressed > 0:
request_size = decompressed
else:
request_size = buf_size
proc.stdin.write('2;1;0;{0}\n'.format(request_size).encode('utf8', 'ignore'))
proc.stdin.flush()
# Read data length.
data_length = int(proc.stdout.readline().strip())
# Read data length.
data_length = int(proc.stdout.readline().strip())
# Write buffer data to file.
f = open(output_base + '.' + found, 'wb')
f.write(proc.stdout.read(data_length))
f.close()
# Write buffer data to file.
f.write(proc.stdout.read(data_length))
with print_lock:
s = algos[algo] + ' ' + found