Commit a5217d62 by devttys0

s/\t/ /g

parent 84e83d0f
......@@ -6,126 +6,126 @@ import ctypes.util
from binwalk.core.compat import *
class Function(object):
'''
Container class for defining library functions.
'''
def __init__(self, **kwargs):
self.name = None
self.type = int
'''
Container class for defining library functions.
'''
def __init__(self, **kwargs):
self.name = None
self.type = int
for (k, v) in iterator(kwargs):
setattr(self, k, v)
for (k, v) in iterator(kwargs):
setattr(self, k, v)
class FunctionHandler(object):
'''
Class for abstracting function calls via ctypes and handling Python 2/3 compatibility issues.
'''
PY2CTYPES = {
bytes : ctypes.c_char_p,
str : ctypes.c_char_p,
int : ctypes.c_int,
float : ctypes.c_float,
bool : ctypes.c_int,
None : ctypes.c_int,
}
RETVAL_CONVERTERS = {
None : int,
int : int,
float : float,
bool : bool,
str : bytes2str,
bytes : str2bytes,
}
def __init__(self, library, function):
'''
Class constructor.
@library - Library handle as returned by ctypes.cdll.LoadLibrary.
@function - An instance of the binwalk.core.C.Function class.
Returns None.
'''
self.retype = function.type
self.function = getattr(library, function.name)
if has_key(self.PY2CTYPES, self.retype):
self.function.restype = self.PY2CTYPES[self.retype]
self.retval_converter = self.RETVAL_CONVERTERS[self.retype]
else:
raise Exception("Unknown return type: '%s'" % self.retype)
def run(self, *args):
'''
Executes the library function, handling Python 2/3 compatibility and properly converting the return type.
@*args - Library function arguments.
Returns the return value from the libraray function.
'''
args = list(args)
# Python3 expects a bytes object for char *'s, not a str.
# This allows us to pass either, regardless of the Python version.
for i in range(0, len(args)):
if isinstance(args[i], str):
args[i] = str2bytes(args[i])
return self.retval_converter(self.function(*args))
'''
Class for abstracting function calls via ctypes and handling Python 2/3 compatibility issues.
'''
PY2CTYPES = {
bytes : ctypes.c_char_p,
str : ctypes.c_char_p,
int : ctypes.c_int,
float : ctypes.c_float,
bool : ctypes.c_int,
None : ctypes.c_int,
}
RETVAL_CONVERTERS = {
None : int,
int : int,
float : float,
bool : bool,
str : bytes2str,
bytes : str2bytes,
}
def __init__(self, library, function):
'''
Class constructor.
@library - Library handle as returned by ctypes.cdll.LoadLibrary.
@function - An instance of the binwalk.core.C.Function class.
Returns None.
'''
self.retype = function.type
self.function = getattr(library, function.name)
if has_key(self.PY2CTYPES, self.retype):
self.function.restype = self.PY2CTYPES[self.retype]
self.retval_converter = self.RETVAL_CONVERTERS[self.retype]
else:
raise Exception("Unknown return type: '%s'" % self.retype)
def run(self, *args):
'''
Executes the library function, handling Python 2/3 compatibility and properly converting the return type.
@*args - Library function arguments.
Returns the return value from the libraray function.
'''
args = list(args)
# Python3 expects a bytes object for char *'s, not a str.
# This allows us to pass either, regardless of the Python version.
for i in range(0, len(args)):
if isinstance(args[i], str):
args[i] = str2bytes(args[i])
return self.retval_converter(self.function(*args))
class Library(object):
'''
Class for loading the specified library via ctypes.
'''
'''
Class for loading the specified library via ctypes.
'''
def __init__(self, library, functions):
'''
Class constructor.
def __init__(self, library, functions):
'''
Class constructor.
@library - Library name (e.g., 'magic' for libmagic).
@functions - A dictionary of function names and their return types (e.g., {'magic_buffer' : str})
@library - Library name (e.g., 'magic' for libmagic).
@functions - A dictionary of function names and their return types (e.g., {'magic_buffer' : str})
Returns None.
'''
self.library = ctypes.cdll.LoadLibrary(self.find_library(library))
if not self.library:
raise Exception("Failed to load library '%s'" % library)
Returns None.
'''
self.library = ctypes.cdll.LoadLibrary(self.find_library(library))
if not self.library:
raise Exception("Failed to load library '%s'" % library)
for function in functions:
f = FunctionHandler(self.library, function)
setattr(self, function.name, f.run)
for function in functions:
f = FunctionHandler(self.library, function)
setattr(self, function.name, f.run)
def find_library(self, library):
'''
Locates the specified library.
def find_library(self, library):
'''
Locates the specified library.
@library - Library name (e.g., 'magic' for libmagic).
@library - Library name (e.g., 'magic' for libmagic).
Returns a string to be passed to ctypes.cdll.LoadLibrary.
'''
lib_path = None
system_paths = {
'linux' : ['/usr/local/lib/lib%s.so' % library],
'linux2' : ['/usr/local/lib/lib%s.so' % library],
'linux3' : ['/usr/local/lib/lib%s.so' % library],
'darwin' : ['/opt/local/lib/lib%s.dylib' % library,
'/usr/local/lib/lib%s.dylib' % library,
] + glob.glob('/usr/local/Cellar/lib%s/*/lib/lib%s.dylib' % (library, library)),
'win32' : ['%s.dll' % library]
}
lib_path = ctypes.util.find_library(library)
if not lib_path:
for path in system_paths[sys.platform]:
if os.path.exists(path):
lib_path = path
break
if not lib_path:
raise Exception("Failed to locate library '%s'" % library)
return lib_path
Returns a string to be passed to ctypes.cdll.LoadLibrary.
'''
lib_path = None
system_paths = {
'linux' : ['/usr/local/lib/lib%s.so' % library],
'linux2' : ['/usr/local/lib/lib%s.so' % library],
'linux3' : ['/usr/local/lib/lib%s.so' % library],
'darwin' : ['/opt/local/lib/lib%s.dylib' % library,
'/usr/local/lib/lib%s.dylib' % library,
] + glob.glob('/usr/local/Cellar/lib%s/*/lib/lib%s.dylib' % (library, library)),
'win32' : ['%s.dll' % library]
}
lib_path = ctypes.util.find_library(library)
if not lib_path:
for path in system_paths[sys.platform]:
if os.path.exists(path):
lib_path = path
break
if not lib_path:
raise Exception("Failed to locate library '%s'" % library)
return lib_path
......@@ -7,68 +7,68 @@ import string
PY_MAJOR_VERSION = sys.version_info[0]
if PY_MAJOR_VERSION > 2:
string.letters = string.ascii_letters
string.letters = string.ascii_letters
def iterator(dictionary):
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
'''
if PY_MAJOR_VERSION > 2:
return dictionary.items()
else:
return dictionary.iteritems()
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
'''
if PY_MAJOR_VERSION > 2:
return dictionary.items()
else:
return dictionary.iteritems()
def has_key(dictionary, key):
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
'''
if PY_MAJOR_VERSION > 2:
return key in dictionary
else:
return dictionary.has_key(key)
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
'''
if PY_MAJOR_VERSION > 2:
return key in dictionary
else:
return dictionary.has_key(key)
def get_keys(dictionary):
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
'''
if PY_MAJOR_VERSION > 2:
return list(dictionary.keys())
else:
return dictionary.keys()
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
'''
if PY_MAJOR_VERSION > 2:
return list(dictionary.keys())
else:
return dictionary.keys()
def str2bytes(string):
'''
For cross compatibility between Python 2 and Python 3 strings.
'''
if isinstance(string, type('')) and PY_MAJOR_VERSION > 2:
return bytes(string, 'latin1')
else:
return string
'''
For cross compatibility between Python 2 and Python 3 strings.
'''
if isinstance(string, type('')) and PY_MAJOR_VERSION > 2:
return bytes(string, 'latin1')
else:
return string
def bytes2str(bs):
'''
For cross compatibility between Python 2 and Python 3 strings.
'''
if isinstance(bs, type(b'')) and PY_MAJOR_VERSION > 2:
return bs.decode('latin1')
else:
return bs
'''
For cross compatibility between Python 2 and Python 3 strings.
'''
if isinstance(bs, type(b'')) and PY_MAJOR_VERSION > 2:
return bs.decode('latin1')
else:
return bs
def string_decode(string):
'''
For cross compatibility between Python 2 and Python 3 strings.
'''
if PY_MAJOR_VERSION > 2:
return bytes(string, 'utf-8').decode('unicode_escape')
else:
return string.decode('string_escape')
'''
For cross compatibility between Python 2 and Python 3 strings.
'''
if PY_MAJOR_VERSION > 2:
return bytes(string, 'utf-8').decode('unicode_escape')
else:
return string.decode('string_escape')
def user_input(prompt=''):
'''
For getting raw user input in Python 2 and 3.
'''
if PY_MAJOR_VERSION > 2:
return input(prompt)
else:
return raw_input(prompt)
'''
For getting raw user input in Python 2 and 3.
'''
if PY_MAJOR_VERSION > 2:
return input(prompt)
else:
return raw_input(prompt)
0 belong x Hex: 0x%.8X
#0 string x String: %s
#0 lequad x Little Endian Quad: %lld
#0 bequad x Big Endian Quad: %lld
0 lelong x Little Endian Long: %d
0 belong x Big Endian Long: %d
0 leshort x Little Endian Short: %d
0 beshort x Big Endian Short: %d
0 ledate x Little Endian Date: %s
0 bedate x Big Endian Date: %s
......@@ -5,87 +5,87 @@ import binwalk.core.C
from binwalk.core.module import Option, Kwarg, Module
class Deflate(object):
'''
Finds and extracts raw deflate compression streams.
'''
'''
Finds and extracts raw deflate compression streams.
'''
ENABLED = False
BLOCK_SIZE = 33*1024
# To prevent many false positives, only show data that decompressed to a reasonable size and didn't just result in a bunch of NULL bytes
MIN_DECOMP_SIZE = 32*1024
DESCRIPTION = "Raw deflate compression stream"
ENABLED = False
BLOCK_SIZE = 33*1024
# To prevent many false positives, only show data that decompressed to a reasonable size and didn't just result in a bunch of NULL bytes
MIN_DECOMP_SIZE = 32*1024
DESCRIPTION = "Raw deflate compression stream"
TINFL_NAME = "tinfl"
TINFL_NAME = "tinfl"
TINFL_FUNCTIONS = [
binwalk.core.C.Function(name="is_deflated", type=int),
binwalk.core.C.Function(name="inflate_raw_file", type=None),
]
TINFL_FUNCTIONS = [
binwalk.core.C.Function(name="is_deflated", type=int),
binwalk.core.C.Function(name="inflate_raw_file", type=None),
]
def __init__(self, module):
self.module = module
def __init__(self, module):
self.module = module
# The tinfl library is built and installed with binwalk
self.tinfl = binwalk.core.C.Library(self.TINFL_NAME, self.TINFL_FUNCTIONS)
# Add an extraction rule
if self.module.extractor.enabled:
self.module.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="deflate", cmd=self._extractor)
# The tinfl library is built and installed with binwalk
self.tinfl = binwalk.core.C.Library(self.TINFL_NAME, self.TINFL_FUNCTIONS)
# Add an extraction rule
if self.module.extractor.enabled:
self.module.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="deflate", cmd=self._extractor)
def _extractor(self, file_name):
out_file = os.path.splitext(file_name)[0]
self.tinfl.inflate_raw_file(file_name, out_file)
def _extractor(self, file_name):
out_file = os.path.splitext(file_name)[0]
self.tinfl.inflate_raw_file(file_name, out_file)
def decompress(self, data):
description = None
def decompress(self, data):
description = None
decomp_size = self.tinfl.is_deflated(data, len(data), 0)
if decomp_size >= self.MIN_DECOMP_SIZE:
description = self.DESCRIPTION + ', uncompressed size >= %d' % decomp_size
decomp_size = self.tinfl.is_deflated(data, len(data), 0)
if decomp_size >= self.MIN_DECOMP_SIZE:
description = self.DESCRIPTION + ', uncompressed size >= %d' % decomp_size
return description
return description
class RawCompression(Module):
DECOMPRESSORS = {
'deflate' : Deflate,
}
DECOMPRESSORS = {
'deflate' : Deflate,
}
TITLE = 'Raw Compression'
TITLE = 'Raw Compression'
CLI = [
Option(short='X',
long='deflate',
kwargs={'enabled' : True, 'decompressor_class' : 'deflate'},
description='Scan for raw deflate compression streams'),
]
CLI = [
Option(short='X',
long='deflate',
kwargs={'enabled' : True, 'decompressor_class' : 'deflate'},
description='Scan for raw deflate compression streams'),
]
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='decompressor_class', default=None),
]
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='decompressor_class', default=None),
]
def init(self):
self.decompressor = self.DECOMPRESSORS[self.decompressor_class](self)
def init(self):
self.decompressor = self.DECOMPRESSORS[self.decompressor_class](self)
def run(self):
for fp in iter(self.next_file, None):
def run(self):
for fp in iter(self.next_file, None):
fp.set_block_size(peek=self.decompressor.BLOCK_SIZE)
fp.set_block_size(peek=self.decompressor.BLOCK_SIZE)
self.header()
self.header()
while True:
(data, dlen) = fp.read_block()
if not data:
break
while True:
(data, dlen) = fp.read_block()
if not data:
break
for i in range(0, dlen):
description = self.decompressor.decompress(data[i:i+self.decompressor.BLOCK_SIZE])
if description:
self.result(description=description, file=fp, offset=fp.tell()-dlen+i)
for i in range(0, dlen):
description = self.decompressor.decompress(data[i:i+self.decompressor.BLOCK_SIZE])
if description:
self.result(description=description, file=fp, offset=fp.tell()-dlen+i)
self.status.completed = fp.tell() - fp.offset
self.status.completed = fp.tell() - fp.offset
self.footer()
self.footer()
......@@ -8,186 +8,186 @@ from binwalk.core.compat import *
from binwalk.core.module import Module, Kwarg, Option, Dependency
class ChiSquare(object):
'''
Performs a Chi Squared test against the provided data.
'''
'''
Performs a Chi Squared test against the provided data.
'''
IDEAL = 256.0
IDEAL = 256.0
def __init__(self):
'''
Class constructor.
def __init__(self):
'''
Class constructor.
Returns None.
'''
self.bytes = {}
self.freedom = self.IDEAL - 1
# Initialize the self.bytes dictionary with keys for all possible byte values (0 - 255)
for i in range(0, int(self.IDEAL)):
self.bytes[chr(i)] = 0
self.reset()
Returns None.
'''
self.bytes = {}
self.freedom = self.IDEAL - 1
# Initialize the self.bytes dictionary with keys for all possible byte values (0 - 255)
for i in range(0, int(self.IDEAL)):
self.bytes[chr(i)] = 0
self.reset()
def reset(self):
self.xc2 = 0.0
self.byte_count = 0
def reset(self):
self.xc2 = 0.0
self.byte_count = 0
for key in self.bytes.keys():
self.bytes[key] = 0
for key in self.bytes.keys():
self.bytes[key] = 0
def update(self, data):
'''
Updates the current byte counts with new data.
def update(self, data):
'''
Updates the current byte counts with new data.
@data - String of bytes to update.
@data - String of bytes to update.
Returns None.
'''
# Count the number of occurances of each byte value
for i in data:
self.bytes[i] += 1
Returns None.
'''
# Count the number of occurances of each byte value
for i in data:
self.bytes[i] += 1
self.byte_count += len(data)
self.byte_count += len(data)
def chisq(self):
'''
Calculate the Chi Square critical value.
def chisq(self):
'''
Calculate the Chi Square critical value.
Returns the critical value.
'''
expected = self.byte_count / self.IDEAL
Returns the critical value.
'''
expected = self.byte_count / self.IDEAL
if expected:
for byte in self.bytes.values():
self.xc2 += ((byte - expected) ** 2 ) / expected
if expected:
for byte in self.bytes.values():
self.xc2 += ((byte - expected) ** 2 ) / expected
return self.xc2
return self.xc2
class EntropyBlock(object):
def __init__(self, **kwargs):
self.start = None
self.end = None
self.length = None
for (k,v) in iterator(kwargs):
setattr(self, k, v)
def __init__(self, **kwargs):
self.start = None
self.end = None
self.length = None
for (k,v) in iterator(kwargs):
setattr(self, k, v)
class HeuristicCompressionAnalyzer(Module):
'''
Performs analysis and attempts to interpret the results.
'''
BLOCK_SIZE = 32
CHI_CUTOFF = 512
ENTROPY_TRIGGER = .90
MIN_BLOCK_SIZE = 4096
BLOCK_OFFSET = 1024
ENTROPY_BLOCK_SIZE = 1024
TITLE = "Heuristic Compression"
DEPENDS = [
Dependency(name='Entropy',
attribute='entropy',
kwargs={'enabled' : True, 'do_plot' : False, 'display_results' : False, 'block_size' : ENTROPY_BLOCK_SIZE}),
]
CLI = [
Option(short='H',
long='heuristic',
kwargs={'enabled' : True},
description='Heuristically classify high entropy data'),
Option(short='a',
long='trigger',
kwargs={'trigger_level' : 0},
type=float,
description='Set the entropy trigger level (0.0 - 1.0)'),
]
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='trigger_level', default=ENTROPY_TRIGGER),
]
def init(self):
self.blocks = {}
self.HEADER[-1] = "HEURISTIC ENTROPY ANALYSIS"
if self.config.block:
self.block_size = self.config.block
else:
self.block_size = self.BLOCK_SIZE
for result in self.entropy.results:
if not has_key(self.blocks, result.file.name):
self.blocks[result.file.name] = []
if result.entropy >= self.trigger_level and (not self.blocks[result.file.name] or self.blocks[result.file.name][-1].end is not None):
self.blocks[result.file.name].append(EntropyBlock(start=result.offset + self.BLOCK_OFFSET))
elif result.entropy < self.trigger_level and self.blocks[result.file.name] and self.blocks[result.file.name][-1].end is None:
self.blocks[result.file.name][-1].end = result.offset - self.BLOCK_OFFSET
def run(self):
for fp in iter(self.next_file, None):
if has_key(self.blocks, fp.name):
self.header()
for block in self.blocks[fp.name]:
if block.end is None:
block.length = fp.offset + fp.length - block.start
else:
block.length = block.end - block.start
if block.length >= self.MIN_BLOCK_SIZE:
self.analyze(fp, block)
self.footer()
def analyze(self, fp, block):
'''
Perform analysis and interpretation.
'''
i = 0
num_error = 0
analyzer_results = []
chi = ChiSquare()
fp.seek(block.start)
while i < block.length:
j = 0
(d, dlen) = fp.read_block()
if not d:
break
while j < dlen:
chi.reset()
data = d[j:j+self.block_size]
if len(data) < self.block_size:
break
chi.update(data)
if chi.chisq() >= self.CHI_CUTOFF:
num_error += 1
j += self.block_size
if (j + i) > block.length:
break
i += dlen
if num_error > 0:
verdict = 'Moderate entropy data, best guess: compressed'
else:
verdict = 'High entropy data, best guess: encrypted'
desc = '%s, size: %d, %d low entropy blocks' % (verdict, block.length, num_error)
self.result(offset=block.start, description=desc, file=fp)
'''
Performs analysis and attempts to interpret the results.
'''
BLOCK_SIZE = 32
CHI_CUTOFF = 512
ENTROPY_TRIGGER = .90
MIN_BLOCK_SIZE = 4096
BLOCK_OFFSET = 1024
ENTROPY_BLOCK_SIZE = 1024
TITLE = "Heuristic Compression"
DEPENDS = [
Dependency(name='Entropy',
attribute='entropy',
kwargs={'enabled' : True, 'do_plot' : False, 'display_results' : False, 'block_size' : ENTROPY_BLOCK_SIZE}),
]
CLI = [
Option(short='H',
long='heuristic',
kwargs={'enabled' : True},
description='Heuristically classify high entropy data'),
Option(short='a',
long='trigger',
kwargs={'trigger_level' : 0},
type=float,
description='Set the entropy trigger level (0.0 - 1.0)'),
]
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='trigger_level', default=ENTROPY_TRIGGER),
]
def init(self):
self.blocks = {}
self.HEADER[-1] = "HEURISTIC ENTROPY ANALYSIS"
if self.config.block:
self.block_size = self.config.block
else:
self.block_size = self.BLOCK_SIZE
for result in self.entropy.results:
if not has_key(self.blocks, result.file.name):
self.blocks[result.file.name] = []
if result.entropy >= self.trigger_level and (not self.blocks[result.file.name] or self.blocks[result.file.name][-1].end is not None):
self.blocks[result.file.name].append(EntropyBlock(start=result.offset + self.BLOCK_OFFSET))
elif result.entropy < self.trigger_level and self.blocks[result.file.name] and self.blocks[result.file.name][-1].end is None:
self.blocks[result.file.name][-1].end = result.offset - self.BLOCK_OFFSET
def run(self):
for fp in iter(self.next_file, None):
if has_key(self.blocks, fp.name):
self.header()
for block in self.blocks[fp.name]:
if block.end is None:
block.length = fp.offset + fp.length - block.start
else:
block.length = block.end - block.start
if block.length >= self.MIN_BLOCK_SIZE:
self.analyze(fp, block)
self.footer()
def analyze(self, fp, block):
'''
Perform analysis and interpretation.
'''
i = 0
num_error = 0
analyzer_results = []
chi = ChiSquare()
fp.seek(block.start)
while i < block.length:
j = 0
(d, dlen) = fp.read_block()
if not d:
break
while j < dlen:
chi.reset()
data = d[j:j+self.block_size]
if len(data) < self.block_size:
break
chi.update(data)
if chi.chisq() >= self.CHI_CUTOFF:
num_error += 1
j += self.block_size
if (j + i) > block.length:
break
i += dlen
if num_error > 0:
verdict = 'Moderate entropy data, best guess: compressed'
else:
verdict = 'High entropy data, best guess: encrypted'
desc = '%s, size: %d, %d low entropy blocks' % (verdict, block.length, num_error)
self.result(offset=block.start, description=desc, file=fp)
......@@ -2,32 +2,32 @@ import binwalk.core.C
from binwalk.core.common import *
class Plugin(object):
'''
Searches for and validates compress'd data.
'''
READ_SIZE = 64
COMPRESS42 = "compress42"
COMPRESS42_FUNCTIONS = [
binwalk.core.C.Function(name="is_compressed", type=bool),
]
def __init__(self, module):
self.fd = None
self.comp = None
if module.name == 'Signature':
self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
def scan(self, result):
if self.comp:
if result.file and result.description.lower().startswith("compress'd data"):
fd = BlockFile(result.file.name, "r", offset=result.offset, length=self.READ_SIZE)
compressed_data = fd.read(self.READ_SIZE)
fd.close()
'''
Searches for and validates compress'd data.
'''
READ_SIZE = 64
COMPRESS42 = "compress42"
COMPRESS42_FUNCTIONS = [
binwalk.core.C.Function(name="is_compressed", type=bool),
]
def __init__(self, module):
self.fd = None
self.comp = None
if module.name == 'Signature':
self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
def scan(self, result):
if self.comp:
if result.file and result.description.lower().startswith("compress'd data"):
fd = BlockFile(result.file.name, "r", offset=result.offset, length=self.READ_SIZE)
compressed_data = fd.read(self.READ_SIZE)
fd.close()
if not self.comp.is_compressed(compressed_data, len(compressed_data)):
result.valid = False
if not self.comp.is_compressed(compressed_data, len(compressed_data)):
result.valid = False
class Plugin(object):
'''
Ensures that ASCII CPIO archive entries only get extracted once.
'''
'''
Ensures that ASCII CPIO archive entries only get extracted once.
'''
def __init__(self, module):
self.found_archive = False
self.enabled = (module.name == 'Signature')
def pre_scan(self, module):
# Be sure to re-set this at the beginning of every scan
self.found_archive = False
def __init__(self, module):
self.found_archive = False
self.enabled = (module.name == 'Signature')
def pre_scan(self, module):
# Be sure to re-set this at the beginning of every scan
self.found_archive = False
def scan(self, result):
if self.enabled and result.valid:
# ASCII CPIO archives consist of multiple entries, ending with an entry named 'TRAILER!!!'.
# Displaying each entry is useful, as it shows what files are contained in the archive,
# but we only want to extract the archive when the first entry is found.
if result.description.startswith('ASCII cpio archive'):
if not self.found_archive:
# This is the first entry. Set found_archive and allow the scan to continue normally.
self.found_archive = True
result.extract = True
elif 'TRAILER!!!' in results['description']:
# This is the last entry, un-set found_archive.
self.found_archive = False
# The first entry has already been found and this is the last entry, or the last entry
# has not yet been found. Don't extract.
result.extract = False
def scan(self, result):
if self.enabled and result.valid:
# ASCII CPIO archives consist of multiple entries, ending with an entry named 'TRAILER!!!'.
# Displaying each entry is useful, as it shows what files are contained in the archive,
# but we only want to extract the archive when the first entry is found.
if result.description.startswith('ASCII cpio archive'):
if not self.found_archive:
# This is the first entry. Set found_archive and allow the scan to continue normally.
self.found_archive = True
result.extract = True
elif 'TRAILER!!!' in results['description']:
# This is the last entry, un-set found_archive.
self.found_archive = False
# The first entry has already been found and this is the last entry, or the last entry
# has not yet been found. Don't extract.
result.extract = False
......@@ -4,61 +4,61 @@ from binwalk.core.compat import *
from binwalk.core.common import BlockFile
class Plugin(object):
'''
Finds and extracts modified LZMA files commonly found in cable modems.
Based on Bernardo Rodrigues' work: http://w00tsec.blogspot.com/2013/11/unpacking-firmware-images-from-cable.html
'''
'''
Finds and extracts modified LZMA files commonly found in cable modems.
Based on Bernardo Rodrigues' work: http://w00tsec.blogspot.com/2013/11/unpacking-firmware-images-from-cable.html
'''
FAKE_LZMA_SIZE = "\x00\x00\x00\x10\x00\x00\x00\x00"
SIGNATURE = "lzma compressed data"
FAKE_LZMA_SIZE = "\x00\x00\x00\x10\x00\x00\x00\x00"
SIGNATURE = "lzma compressed data"
def __init__(self, module):
self.original_cmd = ''
self.enabled = (module.name == 'Signature')
self.module = module
def __init__(self, module):
self.original_cmd = ''
self.enabled = (module.name == 'Signature')
self.module = module
if self.enabled:
# Replace the existing LZMA extraction command with our own
rules = self.module.extractor.get_rules()
for i in range(0, len(rules)):
if rules[i]['regex'].match(self.SIGNATURE):
self.original_cmd = rules[i]['cmd']
rules[i]['cmd'] = self.lzma_cable_extractor
break
if self.enabled:
# Replace the existing LZMA extraction command with our own
rules = self.module.extractor.get_rules()
for i in range(0, len(rules)):
if rules[i]['regex'].match(self.SIGNATURE):
self.original_cmd = rules[i]['cmd']
rules[i]['cmd'] = self.lzma_cable_extractor
break
def lzma_cable_extractor(self, fname):
# Try extracting the LZMA file without modification first
if not self.module.extractor.execute(self.original_cmd, fname):
out_name = os.path.splitext(fname)[0] + '-patched' + os.path.splitext(fname)[1]
fp_out = BlockFile(out_name, 'w')
# Use self.module.config.open_file here to ensure that other config settings (such as byte-swapping) are honored
fp_in = self.module.config.open_file(fname, offset=0, length=0)
fp_in.set_block_size(peek=0)
i = 0
def lzma_cable_extractor(self, fname):
# Try extracting the LZMA file without modification first
if not self.module.extractor.execute(self.original_cmd, fname):
out_name = os.path.splitext(fname)[0] + '-patched' + os.path.splitext(fname)[1]
fp_out = BlockFile(out_name, 'w')
# Use self.module.config.open_file here to ensure that other config settings (such as byte-swapping) are honored
fp_in = self.module.config.open_file(fname, offset=0, length=0)
fp_in.set_block_size(peek=0)
i = 0
while i < fp_in.length:
(data, dlen) = fp_in.read_block()
if i == 0:
out_data = data[0:5] + self.FAKE_LZMA_SIZE + data[5:]
else:
out_data = data
fp_out.write(out_data)
i += dlen
while i < fp_in.length:
(data, dlen) = fp_in.read_block()
if i == 0:
out_data = data[0:5] + self.FAKE_LZMA_SIZE + data[5:]
else:
out_data = data
fp_out.write(out_data)
i += dlen
fp_in.close()
fp_out.close()
fp_in.close()
fp_out.close()
# Overwrite the original file so that it can be cleaned up if -r was specified
shutil.move(out_name, fname)
self.module.extractor.execute(self.original_cmd, fname)
# Overwrite the original file so that it can be cleaned up if -r was specified
shutil.move(out_name, fname)
self.module.extractor.execute(self.original_cmd, fname)
def scan(self, result):
# The modified cable modem LZMA headers all have valid dictionary sizes and a properties byte of 0x5D.
if self.enabled and result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if "properties: 0x5D" in result.description and "invalid dictionary size" not in result.description:
result.valid = True
result.description = result.description.split("invalid uncompressed size")[0] + "missing uncompressed size"
def scan(self, result):
# The modified cable modem LZMA headers all have valid dictionary sizes and a properties byte of 0x5D.
if self.enabled and result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if "properties: 0x5D" in result.description and "invalid dictionary size" not in result.description:
result.valid = True
result.description = result.description.split("invalid uncompressed size")[0] + "missing uncompressed size"
......@@ -2,39 +2,39 @@ import binwalk.core.C
from binwalk.core.common import BlockFile
class Plugin(object):
'''
Searches for and validates zlib compressed data.
'''
MIN_DECOMP_SIZE = 16 * 1024
MAX_DATA_SIZE = 33 * 1024
TINFL = "tinfl"
TINFL_FUNCTIONS = [
binwalk.core.C.Function(name="is_deflated", type=int),
]
def __init__(self, module):
self.tinfl = None
self.module = module
# Only initialize this plugin if this is a signature scan
if module.name == 'Signature':
# Load libtinfl.so
self.tinfl = binwalk.core.C.Library(self.TINFL, self.TINFL_FUNCTIONS)
def scan(self, result):
# If this result is a zlib signature match, try to decompress the data
if self.tinfl and result.file and result.description.lower().startswith('zlib'):
# Seek to and read the suspected zlib data
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
# Check if this is valid zlib data
decomp_size = self.tinfl.is_deflated(data, len(data), 1)
if decomp_size > 0:
result.description += ", uncompressed size >= %d" % decomp_size
else:
result.valid = False
'''
Searches for and validates zlib compressed data.
'''
MIN_DECOMP_SIZE = 16 * 1024
MAX_DATA_SIZE = 33 * 1024
TINFL = "tinfl"
TINFL_FUNCTIONS = [
binwalk.core.C.Function(name="is_deflated", type=int),
]
def __init__(self, module):
self.tinfl = None
self.module = module
# Only initialize this plugin if this is a signature scan
if module.name == 'Signature':
# Load libtinfl.so
self.tinfl = binwalk.core.C.Library(self.TINFL, self.TINFL_FUNCTIONS)
def scan(self, result):
# If this result is a zlib signature match, try to decompress the data
if self.tinfl and result.file and result.description.lower().startswith('zlib'):
# Seek to and read the suspected zlib data
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
# Check if this is valid zlib data
decomp_size = self.tinfl.is_deflated(data, len(data), 1)
if decomp_size > 0:
result.description += ", uncompressed size >= %d" % decomp_size
else:
result.valid = False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment