Commit 41dc863b by devttys0

Added raw lzma extractor to compressor module

parent 82fb2f68
......@@ -16,32 +16,45 @@ class LZMAHeader(object):
class LZMA(object):
DESCRIPTION = "Raw LZMA compression stream"
FAKE_SIZE = b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"
COMMON_PROPERTIES = [0x5D, 0x6E]
MAX_PROP = ((4 * 5 + 4) * 9 + 8)
BLOCK_SIZE = 32*1024
def __init__(self, module):
self.module = module
self.decompressed_data = None
self.properties = None
self.build_properties()
self.build_dictionaries()
self.build_headers()
# Add an extraction rule
#if self.module.extractor.enabled:
# self.module.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="lzma", cmd=self.extractor)
if self.module.extractor.enabled:
self.module.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="7z", cmd=self.extractor)
# TODO: Reliable extraction is horribly inefficient with the Python lzma module
#def extractor(self, file_name):
# compressed_data = binwalk.core.common.BlockFile(file_name).read()
def extractor(self, file_name):
# Open and read the file containing the raw compressed data.
# This is not terribly efficient, especially for large files...
compressed_data = binwalk.core.common.BlockFile(file_name).read()
# Re-run self.decompress to detect the properties for this compressed data (stored in self.properties)
if self.decompress(compressed_data[:self.BLOCK_SIZE]):
# Build an LZMA header on top of the raw compressed data and write it back to disk.
# Header consists of the detected properties values, the largest possible dictionary size,
# and a fake output file size field.
header = chr(self.properties) + self.dictionaries[-1] + ("\xFF" * 8)
binwalk.core.common.BlockFile(file_name, "wb").write(header + compressed_data)
# Try to extract it with all the normal lzma extractors until one works
for exrule in self.module.extractor.match("lzma compressed data"):
if self.module.extractor.execute(exrule['cmd'], file_name) == True:
break
def build_property(self, pb, lp, lc):
prop = (((pb * 5) + lp) * 9) + lc
if prop > self.MAX_PROP:
prop = None
return prop
return int(prop)
def parse_property(self, prop):
prop = int(ord(prop))
......@@ -65,6 +78,7 @@ class LZMA(object):
self.properties = set()
if self.module.partial_scan == True:
# For partial scans, only check the most common properties values
for prop in self.COMMON_PROPERTIES:
self.properties.add(chr(prop))
else:
......@@ -79,7 +93,8 @@ class LZMA(object):
self.dictionaries = []
if self.module.partial_scan == True:
self.dictionaries.append(struct.pack("<I", 2**16))
# For partial scans, only use the largest dictionary value
self.dictionaries.append(binwalk.core.compat.bytes2str(struct.pack("<I", 2**25)))
else:
for n in range(16, 26):
self.dictionaries.append(binwalk.core.compat.bytes2str(struct.pack("<I", 2**n)))
......@@ -89,12 +104,11 @@ class LZMA(object):
for prop in self.properties:
for dictionary in self.dictionaries:
self.headers.add(prop + dictionary + self.FAKE_SIZE)
self.headers.add(prop + dictionary + ("\xFF" * 8))
def decompress(self, data, complete=False):
def decompress(self, data):
result = None
description = None
self.decompressed_data = None
i = 0
for header in self.headers:
......@@ -102,27 +116,25 @@ class LZMA(object):
# The only acceptable exceptions are those indicating that the input data was truncated.
try:
final_data = binwalk.core.compat.str2bytes(header + data)
if complete:
open("test.bin-%d" % i, "wb").write(final_data)
self.decompressed_data = lzma.decompress(final_data)
lzma.decompress(final_data)
result = self.parse_header(header)
break
except IOError as e:
# The Python2 module gives this error on truncated input data.
if not complete and str(e) == "unknown BUF error":
if str(e) == "unknown BUF error":
result = self.parse_header(header)
break
except Exception as e:
# The Python3 module gives this error on truncated input data.
# The inconsistency between modules is a bit worrisome.
if not complete and str(e) == "Compressed data ended before the end-of-stream marker was reached":
if str(e) == "Compressed data ended before the end-of-stream marker was reached":
result = self.parse_header(header)
break
if result is not None:
prop = self.build_property(result.pb, result.lp, result.lc)
self.properties = self.build_property(result.pb, result.lp, result.lc)
description = "%s, properties: 0x%.2X [pb: %d, lp: %d, lc: %d], dictionary size: %d" % (self.DESCRIPTION,
prop,
self.properties,
result.pb,
result.lp,
result.lc,
......
......@@ -348,7 +348,7 @@ class Extractor(Module):
fname = ''
cleanup_extracted_fname = True
original_dir = os.getcwd()
rules = self._match(description)
rules = self.match(description)
file_path = os.path.realpath(file_name)
# No extraction rules for this file
......@@ -451,7 +451,7 @@ class Extractor(Module):
return offset
return -1
def _match(self, description):
def match(self, description):
'''
Check to see if the provided description string matches an extract rule.
Called internally by self.extract().
......
......@@ -17,19 +17,22 @@ class LZMAModPlugin(binwalk.core.plugin.Plugin):
def init(self):
self.original_cmd = ''
# Replace the existing LZMA extraction command with our own
# Note that this assumes that there is *one* LZMA extraction command...
rules = self.module.extractor.get_rules()
for i in range(0, len(rules)):
if rules[i]['regex'] and rules[i]['cmd'] and rules[i]['regex'].match(self.SIGNATURE):
self.original_cmd = rules[i]['cmd']
rules[i]['cmd'] = self.lzma_cable_extractor
break
# Replace the first existing LZMA extraction command with our own
for rule in self.module.extractor.match(self.SIGNATURE):
self.original_cmd = rule['cmd']
rule['cmd'] = self.lzma_cable_extractor
break
#rules = self.module.extractor.get_rules()
#for i in range(0, len(rules)):
# if rules[i]['regex'] and rules[i]['cmd'] and rules[i]['regex'].match(self.SIGNATURE):
# self.original_cmd = rules[i]['cmd']
# rules[i]['cmd'] = self.lzma_cable_extractor
# break
def lzma_cable_extractor(self, fname):
# Try extracting the LZMA file without modification first
result = self.module.extractor.execute(self.original_cmd, fname)
# If the external extractor was successul (True) or didn't exist (None), don't do anything.
if result not in [True, None]:
out_name = os.path.splitext(fname)[0] + '-patched' + os.path.splitext(fname)[1]
......@@ -41,14 +44,14 @@ class LZMAModPlugin(binwalk.core.plugin.Plugin):
while i < fp_in.length:
(data, dlen) = fp_in.read_block()
if i == 0:
out_data = data[0:5] + self.FAKE_LZMA_SIZE + data[5:]
else:
out_data = data
fp_out.write(out_data)
i += dlen
fp_in.close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment