Commit 4bcef6c6 by Craig Heffner

Modified code to use PEP8 formatting via autopep8

parent 1eab95fb
......@@ -4,9 +4,13 @@ from binwalk.core.module import Modules
from binwalk.core.exceptions import ModuleException
# Convenience functions
def scan(*args, **kwargs):
with Modules(*args, **kwargs) as m:
objs = m.execute()
return objs
def execute(*args, **kwargs):
return scan(*args, **kwargs)
......@@ -6,10 +6,13 @@ import ctypes.util
import binwalk.core.common
from binwalk.core.compat import *
class Function(object):
'''
Container class for defining library functions.
'''
def __init__(self, **kwargs):
self.name = None
self.type = int
......@@ -17,28 +20,30 @@ class Function(object):
for (k, v) in iterator(kwargs):
setattr(self, k, v)
class FunctionHandler(object):
'''
Class for abstracting function calls via ctypes and handling Python 2/3 compatibility issues.
'''
PY2CTYPES = {
bytes : ctypes.c_char_p,
str : ctypes.c_char_p,
int : ctypes.c_int,
float : ctypes.c_float,
bool : ctypes.c_int,
None : ctypes.c_int,
bytes: ctypes.c_char_p,
str: ctypes.c_char_p,
int: ctypes.c_int,
float: ctypes.c_float,
bool: ctypes.c_int,
None: ctypes.c_int,
}
RETVAL_CONVERTERS = {
None : int,
int : int,
float : float,
bool : bool,
str : bytes2str,
bytes : str2bytes,
None: int,
int: int,
float: float,
bool: bool,
str: bytes2str,
bytes: str2bytes,
}
def __init__(self, library, function):
'''
Class constructor.
......@@ -58,7 +63,7 @@ class FunctionHandler(object):
else:
self.function.restype = self.retype
self.retval_converter = None
#raise Exception("Unknown return type: '%s'" % self.retype)
# raise Exception("Unknown return type: '%s'" % self.retype)
def run(self, *args):
'''
......@@ -70,7 +75,7 @@ class FunctionHandler(object):
'''
args = list(args)
# Python3 expects a bytes object for char *'s, not a str.
# Python3 expects a bytes object for char *'s, not a str.
# This allows us to pass either, regardless of the Python version.
for i in range(0, len(args)):
if isinstance(args[i], str):
......@@ -81,8 +86,10 @@ class FunctionHandler(object):
retval = self.retval_converter(retval)
return retval
class Library(object):
'''
Class for loading the specified library via ctypes.
'''
......@@ -101,7 +108,7 @@ class Library(object):
if not self.library:
raise Exception("Failed to load library '%s'" % library)
for function in functions:
for function in functions:
f = FunctionHandler(self.library, function)
setattr(self, function.name, f.run)
......@@ -110,25 +117,26 @@ class Library(object):
Locates the specified library.
@libraries - Library name (e.g., 'magic' for libmagic), or a list of names.
Returns a string to be passed to ctypes.cdll.LoadLibrary.
'''
lib_path = None
prefix = binwalk.core.common.get_libs_path()
if isinstance(libraries, str):
libraries = [libraries]
for library in libraries:
system_paths = {
'linux' : [os.path.join(prefix, 'lib%s.so' % library), '/usr/local/lib/lib%s.so' % library],
'cygwin' : [os.path.join(prefix, 'lib%s.so' % library), '/usr/local/lib/lib%s.so' % library],
'win32' : [os.path.join(prefix, 'lib%s.dll' % library), '%s.dll' % library],
'darwin' : [os.path.join(prefix, 'lib%s.dylib' % library),
'/opt/local/lib/lib%s.dylib' % library,
'/usr/local/lib/lib%s.dylib' % library,
] + glob.glob('/usr/local/Cellar/*%s*/*/lib/lib%s.dylib' % (library, library)),
'linux': [os.path.join(prefix, 'lib%s.so' % library), '/usr/local/lib/lib%s.so' % library],
'cygwin': [os.path.join(prefix, 'lib%s.so' % library), '/usr/local/lib/lib%s.so' % library],
'win32': [os.path.join(prefix, 'lib%s.dll' % library), '%s.dll' % library],
'darwin': [os.path.join(prefix, 'lib%s.dylib' % library),
'/opt/local/lib/lib%s.dylib' % library,
'/usr/local/lib/lib%s.dylib' % library,
] + glob.glob(
'/usr/local/Cellar/*%s*/*/lib/lib%s.dylib' % (library, library)),
}
for i in range(2, 4):
......@@ -136,27 +144,30 @@ class Library(object):
# Search the common install directories first; these are usually not in the library search path
# Search these *first*, since a) they are the most likely locations and b) there may be a
# discrepency between where ctypes.util.find_library and ctypes.cdll.LoadLibrary search for libs.
# discrepency between where ctypes.util.find_library and
# ctypes.cdll.LoadLibrary search for libs.
for path in system_paths[sys.platform]:
binwalk.core.common.debug("Searching for '%s'" % path)
if os.path.exists(path):
lib_path = path
break
# If we failed to find the library, check the standard library search paths
# If we failed to find the library, check the standard library
# search paths
if not lib_path:
lib_path = ctypes.util.find_library(library)
# Use the first library that we can find
if lib_path:
binwalk.core.common.debug("Found library '%s' at: %s" % (library, lib_path))
binwalk.core.common.debug(
"Found library '%s' at: %s" % (library, lib_path))
break
else:
binwalk.core.common.debug("Could not find library '%s'" % library)
binwalk.core.common.debug(
"Could not find library '%s'" % library)
# If we still couldn't find the library, error out
if not lib_path:
raise Exception("Failed to locate libraries '%s'" % str(libraries))
return lib_path
......@@ -21,10 +21,12 @@ if not __debug__:
else:
DEBUG = False
def MSWindows():
# Returns True if running in a Microsoft Windows OS
return (platform.system() == 'Windows')
def debug(msg):
'''
Displays debug messages to stderr only if the Python interpreter was invoked with the -O flag.
......@@ -33,27 +35,32 @@ def debug(msg):
sys.stderr.write("DEBUG: " + msg + "\n")
sys.stderr.flush()
def warning(msg):
'''
Prints warning messages to stderr
'''
sys.stderr.write("\nWARNING: " + msg + "\n")
def error(msg):
'''
Prints error messages to stderr
'''
sys.stderr.write("\nERROR: " + msg + "\n")
def get_module_path():
root = __file__
if os.path.islink(root):
root = os.path.realpath(root)
return os.path.dirname(os.path.dirname(os.path.abspath(root)))
def get_libs_path():
return os.path.join(get_module_path(), "libs")
def file_md5(file_name):
'''
Generate an MD5 hash of the specified file.
......@@ -65,11 +72,12 @@ def file_md5(file_name):
md5 = hashlib.md5()
with open(file_name, 'rb') as f:
for chunk in iter(lambda: f.read(128*md5.block_size), b''):
for chunk in iter(lambda: f.read(128 * md5.block_size), b''):
md5.update(chunk)
return md5.hexdigest()
def file_size(filename):
'''
Obtains the size of a given file.
......@@ -85,10 +93,12 @@ def file_size(filename):
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception("file_size failed to obtain the size of '%s': %s" % (filename, str(e)))
raise Exception(
"file_size failed to obtain the size of '%s': %s" % (filename, str(e)))
finally:
os.close(fd)
def strip_quoted_strings(string):
'''
Strips out data in between double quotes.
......@@ -101,9 +111,11 @@ def strip_quoted_strings(string):
# Note that this removes everything in between the first and last double quote.
# This is intentional, as printed (and quoted) strings from a target file may contain
# double quotes, and this function should ignore those. However, it also means that any
# data between two quoted strings (ex: '"quote 1" you won't see me "quote 2"') will also be stripped.
# data between two quoted strings (ex: '"quote 1" you won't see me "quote
# 2"') will also be stripped.
return re.sub(r'\"(.*)\"', "", string)
def get_quoted_strings(string):
'''
Returns a string comprised of all data in between double quotes.
......@@ -118,13 +130,15 @@ def get_quoted_strings(string):
# Note that this gets everything in between the first and last double quote.
# This is intentional, as printed (and quoted) strings from a target file may contain
# double quotes, and this function should ignore those. However, it also means that any
# data between two quoted strings (ex: '"quote 1" non-quoted data "quote 2"') will also be included.
# data between two quoted strings (ex: '"quote 1" non-quoted data
# "quote 2"') will also be included.
return re.findall(r'\"(.*)\"', string)[0]
except KeyboardInterrupt as e:
raise e
except Exception:
return ''
def unique_file_name(base_name, extension=''):
'''
Creates a unique file name based on the specified base name.
......@@ -147,6 +161,7 @@ def unique_file_name(base_name, extension=''):
return fname
def strings(filename, minimum=4):
'''
A strings generator, similar to the Unix strings utility.
......@@ -174,13 +189,16 @@ def strings(filename, minimum=4):
else:
result = ""
class GenericContainer(object):
def __init__(self, **kwargs):
for (k,v) in iterator(kwargs):
for (k, v) in iterator(kwargs):
setattr(self, k, v)
class MathExpression(object):
'''
Class for safely evaluating mathematical expressions from a string.
Stolen from: http://stackoverflow.com/questions/2371436/evaluating-a-mathematical-expression-in-a-string
......@@ -213,22 +231,25 @@ class MathExpression(object):
return self._eval(ast.parse(expr).body[0].value)
def _eval(self, node):
if isinstance(node, ast.Num): # <number>
if isinstance(node, ast.Num): # <number>
return node.n
elif isinstance(node, ast.operator): # <operator>
elif isinstance(node, ast.operator): # <operator>
return self.OPERATORS[type(node.op)]
elif isinstance(node, ast.UnaryOp):
return self.OPERATORS[type(node.op)](0, self._eval(node.operand))
elif isinstance(node, ast.BinOp): # <left> <operator> <right>
elif isinstance(node, ast.BinOp): # <left> <operator> <right>
return self.OPERATORS[type(node.op)](self._eval(node.left), self._eval(node.right))
else:
raise TypeError(node)
class StringFile(object):
'''
A class to allow access to strings as if they were read from a file.
Used internally as a conditional superclass to InternalBlockFile.
'''
def __init__(self, fname, mode='r'):
self.string = fname
self.name = "String"
......@@ -238,7 +259,7 @@ class StringFile(object):
if n == -1:
data = self.string[self.total_read:]
else:
data = self.string[self.total_read:self.total_read+n]
data = self.string[self.total_read:self.total_read + n]
return data
def tell(self):
......@@ -253,10 +274,12 @@ class StringFile(object):
def close(self):
pass
def BlockFile(fname, mode='r', subclass=io.FileIO, **kwargs):
# Defining a class inside a function allows it to be dynamically subclassed
class InternalBlockFile(subclass):
'''
Abstraction class for accessing binary files.
......@@ -289,7 +312,8 @@ def BlockFile(fname, mode='r', subclass=io.FileIO, **kwargs):
DEFAULT_BLOCK_PEEK_SIZE = 8 * 1024
# Max number of bytes to process at one time. This needs to be large enough to
# limit disk I/O, but small enough to limit the size of processed data blocks.
# limit disk I/O, but small enough to limit the size of processed data
# blocks.
DEFAULT_BLOCK_READ_SIZE = 1 * 1024 * 1024
def __init__(self, fname, mode='r', length=0, offset=0, block=DEFAULT_BLOCK_READ_SIZE, peek=DEFAULT_BLOCK_PEEK_SIZE, swap=0):
......@@ -310,7 +334,8 @@ def BlockFile(fname, mode='r', subclass=io.FileIO, **kwargs):
self.block_read_size = self.DEFAULT_BLOCK_READ_SIZE
self.block_peek_size = self.DEFAULT_BLOCK_PEEK_SIZE
# This is so that custom parent classes can access/modify arguments as necessary
# This is so that custom parent classes can access/modify arguments
# as necessary
self.args = GenericContainer(fname=fname,
mode=mode,
length=length,
......@@ -390,7 +415,7 @@ def BlockFile(fname, mode='r', subclass=io.FileIO, **kwargs):
if self.swap_size > 0:
while i < len(block):
data += block[i:i+self.swap_size][::-1]
data += block[i:i + self.swap_size][::-1]
i += self.swap_size
else:
data = block
......@@ -398,7 +423,8 @@ def BlockFile(fname, mode='r', subclass=io.FileIO, **kwargs):
return data
def reset(self):
self.set_block_size(block=self.base_block_size, peek=self.base_peek_size)
self.set_block_size(
block=self.base_block_size, peek=self.base_peek_size)
self.seek(self.offset)
def set_block_size(self, block=None, peek=None):
......@@ -444,7 +470,7 @@ def BlockFile(fname, mode='r', subclass=io.FileIO, **kwargs):
n = self.length - self.total_read
while n < 0 or l < n:
tmp = super(self.__class__, self).read(n-l)
tmp = super(self.__class__, self).read(n - l)
if tmp:
data += tmp
l += len(tmp)
......@@ -487,4 +513,3 @@ def BlockFile(fname, mode='r', subclass=io.FileIO, **kwargs):
return (data, dlen)
return InternalBlockFile(fname, mode=mode, **kwargs)
......@@ -9,6 +9,7 @@ PY_MAJOR_VERSION = sys.version_info[0]
if PY_MAJOR_VERSION > 2:
string.letters = string.ascii_letters
def iterator(dictionary):
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
......@@ -18,6 +19,7 @@ def iterator(dictionary):
else:
return dictionary.iteritems()
def has_key(dictionary, key):
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
......@@ -27,6 +29,7 @@ def has_key(dictionary, key):
else:
return dictionary.has_key(key)
def get_keys(dictionary):
'''
For cross compatibility between Python 2 and Python 3 dictionaries.
......@@ -36,6 +39,7 @@ def get_keys(dictionary):
else:
return dictionary.keys()
def str2bytes(string):
'''
For cross compatibility between Python 2 and Python 3 strings.
......@@ -45,6 +49,7 @@ def str2bytes(string):
else:
return string
def bytes2str(bs):
'''
For cross compatibility between Python 2 and Python 3 strings.
......@@ -54,6 +59,7 @@ def bytes2str(bs):
else:
return bs
def string_decode(string):
'''
For cross compatibility between Python 2 and Python 3 strings.
......@@ -63,6 +69,7 @@ def string_decode(string):
else:
return string.decode('string_escape')
def user_input(prompt=''):
'''
For getting raw user input in Python 2 and 3.
......@@ -71,4 +78,3 @@ def user_input(prompt=''):
return input(prompt)
else:
return raw_input(prompt)
......@@ -7,7 +7,9 @@ import datetime
import binwalk.core.common
from binwalk.core.compat import *
class Display(object):
'''
Class to handle display of output and writing to log files.
This class is instantiated for all modules implicitly and should not need to be invoked directly by most modules.
......@@ -100,26 +102,32 @@ class Display(object):
self.log("", [file_name, md5sum, timestamp])
self._fprint("%s", "\n", csv=False)
self._fprint("Scan Time: %s\n", [timestamp], csv=False, filter=False)
self._fprint("Target File: %s\n", [file_name], csv=False, filter=False)
self._fprint("MD5 Checksum: %s\n", [md5sum], csv=False, filter=False)
self._fprint("Scan Time: %s\n", [
timestamp], csv=False, filter=False)
self._fprint("Target File: %s\n", [
file_name], csv=False, filter=False)
self._fprint(
"MD5 Checksum: %s\n", [md5sum], csv=False, filter=False)
if self.custom_verbose_format and self.custom_verbose_args:
self._fprint(self.custom_verbose_format, self.custom_verbose_args, csv=False, filter=False)
self._fprint(
self.custom_verbose_format, self.custom_verbose_args, csv=False, filter=False)
self._fprint("%s", "\n", csv=False, filter=False)
self._fprint(self.header_format, args, filter=False)
self._fprint("%s", ["-" * self.HEADER_WIDTH + "\n"], csv=False, filter=False)
self._fprint(
"%s", ["-" * self.HEADER_WIDTH + "\n"], csv=False, filter=False)
def result(self, *args):
# Convert to list for item assignment
args = list(args)
# Replace multiple spaces with single spaces. This is to prevent accidentally putting
# four spaces in the description string, which would break auto-formatting.
# four spaces in the description string, which would break
# auto-formatting.
for i in range(len(args)):
if isinstance(args[i], str):
while " " in args[i]:
args[i] = args[i].replace(" " , " ")
args[i] = args[i].replace(" ", " ")
self._fprint(self.result_format, tuple(args))
......@@ -177,13 +185,15 @@ class Display(object):
offset = 0
self.string_parts = []
# Split the line into an array of columns, e.g., ['0', '0x00000000', 'Some description here']
line_columns = line.split(None, self.num_columns-1)
# Split the line into an array of columns, e.g., ['0', '0x00000000',
# 'Some description here']
line_columns = line.split(None, self.num_columns - 1)
if line_columns:
# Find where the start of the last column (description) starts in the line of text.
# All line wraps need to be aligned to this offset.
offset = line.rfind(line_columns[-1])
# The delimiter will be a newline followed by spaces padding out the line wrap to the alignment offset.
# The delimiter will be a newline followed by spaces padding out
# the line wrap to the alignment offset.
delim += ' ' * offset
if line_columns and self.fit_to_screen and len(line) > self.SCREEN_WIDTH:
......@@ -194,19 +204,25 @@ class Display(object):
# Loop to split up line into multiple max_line_wrap_length pieces
while len(line[offset:]) > max_line_wrap_length:
# Find the nearest space to wrap the line at (so we don't split a word across two lines)
split_offset = line[offset:offset+max_line_wrap_length].rfind(' ')
# If there were no good places to split the line, just truncate it at max_line_wrap_length
# Find the nearest space to wrap the line at (so we don't split
# a word across two lines)
split_offset = line[
offset:offset + max_line_wrap_length].rfind(' ')
# If there were no good places to split the line, just truncate
# it at max_line_wrap_length
if split_offset < 1:
split_offset = max_line_wrap_length
self._append_to_data_parts(line, offset, offset+split_offset)
self._append_to_data_parts(line, offset, offset + split_offset)
offset += split_offset
# Add any remaining data (guarunteed to be max_line_wrap_length long or shorter) to self.string_parts
self._append_to_data_parts(line, offset, offset+len(line[offset:]))
# Add any remaining data (guarunteed to be max_line_wrap_length
# long or shorter) to self.string_parts
self._append_to_data_parts(
line, offset, offset + len(line[offset:]))
# Append self.string_parts to formatted_line; each part seperated by delim
# Append self.string_parts to formatted_line; each part seperated
# by delim
formatted_line += delim.join(self.string_parts)
else:
formatted_line = line
......@@ -228,10 +244,10 @@ class Display(object):
import termios
# Get the terminal window width
hw = struct.unpack('hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))
hw = struct.unpack(
'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))
self.SCREEN_WIDTH = self.HEADER_WIDTH = hw[1]
except KeyboardInterrupt as e:
raise e
except Exception:
pass
class ParserException(Exception):
'''
Exception thrown specifically for signature file parsing errors.
'''
pass
class ModuleException(Exception):
'''
Module exception class.
Nothing special here except the name.
'''
pass
class IgnoreFileException(Exception):
'''
Special exception class used by the load_file plugin method
to indicate that the file that we are attempting to load
......
......@@ -4,11 +4,14 @@ import io
import os
import logging
class ShutUpHashlib(logging.Filter):
'''
This is used to suppress hashlib exception messages
if using the Python interpreter bundled with IDA.
'''
def filter(self, record):
return not record.getMessage().startswith("code for hash")
......@@ -16,14 +19,16 @@ try:
import idc
import idaapi
LOADED_IN_IDA = True
logger = logging.getLogger()
logger = logging.getLogger()
logger.addFilter(ShutUpHashlib())
except ImportError:
LOADED_IN_IDA = False
def start_address():
return idaapi.get_first_seg().startEA
def end_address():
last_ea = idc.BADADDR
seg = idaapi.get_first_seg()
......@@ -34,7 +39,9 @@ def end_address():
return last_ea
class IDBFileIO(io.FileIO):
'''
A custom class to override binwalk.core.common.Blockfile in order to
read data directly out of the IDB, rather than reading from the original
......@@ -58,7 +65,7 @@ class IDBFileIO(io.FileIO):
if self.args.size == 0:
self.args.size = end_address()
if self.args.offset == 0:
self.args.offset = start_address()
elif self.args.offset < 0:
......@@ -89,7 +96,7 @@ class IDBFileIO(io.FileIO):
if filler_count:
data += "\x00" * filler_count
filler_count = 0
if (self.idb_pos + n) > segment.endEA:
read_count = segment.endEA - self.idb_pos
else:
......@@ -98,7 +105,8 @@ class IDBFileIO(io.FileIO):
try:
data += idc.GetManyBytes(self.idb_pos, read_count)
except TypeError as e:
# This happens when trying to read from uninitialized segments (e.g., .bss)
# This happens when trying to read from uninitialized
# segments (e.g., .bss)
data += "\x00" * read_count
n -= read_count
......@@ -116,7 +124,7 @@ class IDBFileIO(io.FileIO):
else:
# Don't actually write anything to the IDB, as, IMHO,
# a binwalk plugin should never do this. But return the
# number of bytes we were requested to write so that
# number of bytes we were requested to write so that
# any callers are happy.
return len(data)
......@@ -136,4 +144,3 @@ class IDBFileIO(io.FileIO):
return super(IDBFileIO, self).tell()
else:
return self.idb_pos
......@@ -9,12 +9,15 @@ import binwalk.core.settings
from binwalk.core.compat import *
from binwalk.core.exceptions import IgnoreFileException
class Plugin(object):
'''
Class from which all plugin classes are based.
'''
# A list of case-sensitive module names for which this plugin should be loaded.
# If no module names are specified, the plugin will be loaded for all modules.
# If no module names are specified, the plugin will be loaded for all
# modules.
MODULES = []
def __init__(self, module):
......@@ -64,7 +67,9 @@ class Plugin(object):
'''
pass
class Plugins(object):
'''
Class to load and call plugin callback functions, handled automatically by Binwalk.scan / Binwalk.single_scan.
An instance of this class is available during a scan via the Binwalk.plugins object.
......@@ -114,7 +119,8 @@ class Plugins(object):
except IgnoreFileException as e:
raise e
except Exception as e:
binwalk.core.common.warning("%s.%s failed: %s" % (callback.__module__, callback.__name__, e))
binwalk.core.common.warning(
"%s.%s failed: %s" % (callback.__module__, callback.__name__, e))
def _find_plugin_class(self, plugin):
for (name, klass) in inspect.getmembers(plugin, inspect.isclass):
......@@ -145,17 +151,17 @@ class Plugins(object):
'''
plugins = {
'user' : {
'modules' : [],
'descriptions' : {},
'enabled' : {},
'path' : None,
'user': {
'modules': [],
'descriptions': {},
'enabled': {},
'path': None,
},
'system' : {
'modules' : [],
'descriptions' : {},
'enabled' : {},
'path' : None,
'system': {
'modules': [],
'descriptions': {},
'enabled': {},
'path': None,
}
}
......@@ -171,7 +177,8 @@ class Plugins(object):
module = file_name[:-len(self.MODULE_EXTENSION)]
try:
plugin = imp.load_source(module, os.path.join(plugins[key]['path'], file_name))
plugin = imp.load_source(
module, os.path.join(plugins[key]['path'], file_name))
plugin_class = self._find_plugin_class(plugin)
plugins[key]['enabled'][module] = True
......@@ -179,15 +186,18 @@ class Plugins(object):
except KeyboardInterrupt as e:
raise e
except Exception as e:
binwalk.core.common.warning("Error loading plugin '%s': %s" % (file_name, str(e)))
binwalk.core.common.warning(
"Error loading plugin '%s': %s" % (file_name, str(e)))
plugins[key]['enabled'][module] = False
try:
plugins[key]['descriptions'][module] = plugin_class.__doc__.strip().split('\n')[0]
plugins[key]['descriptions'][
module] = plugin_class.__doc__.strip().split('\n')[0]
except KeyboardInterrupt as e:
raise e
except Exception as e:
plugins[key]['descriptions'][module] = 'No description'
plugins[key]['descriptions'][
module] = 'No description'
return plugins
def load_plugins(self):
......@@ -198,7 +208,8 @@ class Plugins(object):
def _load_plugin_modules(self, plugins):
for module in plugins['modules']:
try:
file_path = os.path.join(plugins['path'], module + self.MODULE_EXTENSION)
file_path = os.path.join(
plugins['path'], module + self.MODULE_EXTENSION)
except KeyboardInterrupt as e:
raise e
except Exception:
......@@ -220,7 +231,8 @@ class Plugins(object):
pass
try:
self.load_file.append(getattr(class_instance, self.LOADFILE))
self.load_file.append(
getattr(class_instance, self.LOADFILE))
except KeyboardInterrupt as e:
raise e
except Exception as e:
......@@ -234,7 +246,8 @@ class Plugins(object):
pass
try:
self.post_scan.append(getattr(class_instance, self.POSTSCAN))
self.post_scan.append(
getattr(class_instance, self.POSTSCAN))
except KeyboardInterrupt as e:
raise e
except Exception as e:
......@@ -250,7 +263,8 @@ class Plugins(object):
except KeyboardInterrupt as e:
raise e
except Exception as e:
binwalk.core.common.warning("Failed to load plugin module '%s': %s" % (module, str(e)))
binwalk.core.common.warning(
"Failed to load plugin module '%s': %s" % (module, str(e)))
def pre_scan_callbacks(self, obj):
return self._call_plugins(self.pre_scan)
......@@ -266,4 +280,3 @@ class Plugins(object):
def scan_callbacks(self, obj):
return self._call_plugins(self.scan, obj)
# Code for loading and accessing binwalk settings (extraction rules, signature files, etc).
# Code for loading and accessing binwalk settings (extraction rules,
# signature files, etc).
import os
import binwalk.core.common as common
from binwalk.core.compat import *
class Settings:
'''
Binwalk settings class, used for accessing user and system file paths and general configuration settings.
......@@ -41,18 +44,26 @@ class Settings:
self.system_dir = common.get_module_path()
# Build the paths to all user-specific files
self.user = common.GenericContainer(binarch=self._user_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(user_only=True),
extract=self._user_path(self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
modules=self._user_path(self.BINWALK_MODULES_DIR),
plugins=self._user_path(self.BINWALK_PLUGINS_DIR))
self.user = common.GenericContainer(
binarch=self._user_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(
user_only=True),
extract=self._user_path(
self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
modules=self._user_path(
self.BINWALK_MODULES_DIR),
plugins=self._user_path(self.BINWALK_PLUGINS_DIR))
# Build the paths to all system-wide files
self.system = common.GenericContainer(binarch=self._system_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(system_only=True),
extract=self._system_path(self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
plugins=self._system_path(self.BINWALK_PLUGINS_DIR))
self.system = common.GenericContainer(
binarch=self._system_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(
system_only=True),
extract=self._system_path(
self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
plugins=self._system_path(self.BINWALK_PLUGINS_DIR))
def _magic_signature_files(self, system_only=False, user_only=False):
'''
......@@ -64,15 +75,18 @@ class Settings:
Returns a list of user/system magic signature files.
'''
files = []
user_binarch = self._user_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
system_binarch = self._system_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
user_binarch = self._user_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
system_binarch = self._system_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
def list_files(dir_path):
# Ignore hidden dotfiles.
return [os.path.join(dir_path, x) for x in os.listdir(dir_path) if not x.startswith('.')]
if not system_only:
user_dir = os.path.join(self.user_dir, self.BINWALK_USER_DIR, self.BINWALK_MAGIC_DIR)
user_dir = os.path.join(
self.user_dir, self.BINWALK_USER_DIR, self.BINWALK_MAGIC_DIR)
files += list_files(user_dir)
if not user_only:
system_dir = os.path.join(self.system_dir, self.BINWALK_MAGIC_DIR)
......@@ -175,7 +189,7 @@ class Settings:
'''
try:
return self._file_path(os.path.join(self.user_dir, self.BINWALK_USER_DIR, subdir), basename)
except KeyboardInterrupt as e :
except KeyboardInterrupt as e:
raise e
except Exception:
return None
......@@ -191,8 +205,7 @@ class Settings:
'''
try:
return self._file_path(os.path.join(self.system_dir, subdir), basename)
except KeyboardInterrupt as e :
except KeyboardInterrupt as e:
raise e
except Exception:
return None
......@@ -13,6 +13,7 @@ try:
except ImportError:
import socketserver as SocketServer
class StatusRequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
......@@ -27,47 +28,57 @@ class StatusRequestHandler(SocketServer.BaseRequestHandler):
time.sleep(0.1)
try:
self.request.send(binwalk.core.compat.str2bytes('\b' * last_status_message_len))
self.request.send(binwalk.core.compat.str2bytes(' ' * last_status_message_len))
self.request.send(binwalk.core.compat.str2bytes('\b' * last_status_message_len))
self.request.send(
binwalk.core.compat.str2bytes('\b' * last_status_message_len))
self.request.send(
binwalk.core.compat.str2bytes(' ' * last_status_message_len))
self.request.send(
binwalk.core.compat.str2bytes('\b' * last_status_message_len))
if self.server.binwalk.status.shutdown:
self.server.binwalk.status.finished = True
break
if self.server.binwalk.status.total != 0:
percentage = ((float(self.server.binwalk.status.completed) / float(self.server.binwalk.status.total)) * 100)
status_message = message_format % (self.server.binwalk.status.fp.path,
percentage,
self.server.binwalk.status.completed,
self.server.binwalk.status.total)
percentage = (
(float(self.server.binwalk.status.completed) / float(self.server.binwalk.status.total)) * 100)
status_message = message_format % (
self.server.binwalk.status.fp.path,
percentage,
self.server.binwalk.status.completed,
self.server.binwalk.status.total)
elif not message_sent:
status_message = "No status information available at this time!"
else:
continue
last_status_message_len = len(status_message)
self.request.send(binwalk.core.compat.str2bytes(status_message))
self.request.send(
binwalk.core.compat.str2bytes(status_message))
message_sent = True
except IOError as e:
if e.errno == errno.EPIPE:
break
except Exception as e:
binwalk.core.common.debug('StatusRequestHandler exception: ' + str(e) + '\n')
binwalk.core.common.debug(
'StatusRequestHandler exception: ' + str(e) + '\n')
except KeyboardInterrupt as e:
raise e
self.server.binwalk.status.running = False
return
class ThreadedStatusServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
daemon_threads = True
allow_reuse_address = True
class StatusServer(object):
def __init__(self, port, binwalk):
self.server = ThreadedStatusServer(('127.0.0.1', port), StatusRequestHandler)
self.server = ThreadedStatusServer(
('127.0.0.1', port), StatusRequestHandler)
self.server.binwalk = binwalk
t = threading.Thread(target=self.server.serve_forever)
......
......@@ -17,6 +17,6 @@ from binwalk.modules.extractor import Extractor
from binwalk.modules.entropy import Entropy
# These are depreciated.
#from binwalk.modules.binvis import Plotter
#from binwalk.modules.hashmatch import HashMatch
#from binwalk.modules.heuristics import HeuristicCompressionAnalyzer
# from binwalk.modules.binvis import Plotter
# from binwalk.modules.hashmatch import HashMatch
# from binwalk.modules.heuristics import HeuristicCompressionAnalyzer
......@@ -5,7 +5,9 @@ from binwalk.core.compat import *
from binwalk.core.common import BlockFile
from binwalk.core.module import Module, Option, Kwarg
class Plotter(Module):
'''
Base class for visualizing binaries in Qt.
Other plotter classes are derived from this.
......@@ -17,33 +19,34 @@ class Plotter(Module):
TITLE = "Binary Visualization"
CLI = [
Option(short='3',
long='3D',
kwargs={'axis' : 3, 'enabled' : True},
description='Generate a 3D binary visualization'),
Option(short='2',
long='2D',
kwargs={'axis' : 2, 'enabled' : True},
description='Project data points onto 3D cube walls only'),
Option(short='V',
long='points',
type=int,
kwargs={'max_points' : 0},
description='Set the maximum number of plotted data points'),
# Option(short='V',
# long='grids',
# kwargs={'show_grids' : True},
# description='Display the x-y-z grids in the resulting plot'),
Option(short='3',
long='3D',
kwargs={'axis': 3, 'enabled': True},
description='Generate a 3D binary visualization'),
Option(short='2',
long='2D',
kwargs={'axis': 2, 'enabled': True},
description='Project data points onto 3D cube walls only'),
Option(short='V',
long='points',
type=int,
kwargs={'max_points': 0},
description='Set the maximum number of plotted data points'),
# Option(short='V',
# long='grids',
# kwargs={'show_grids' : True},
# description='Display the x-y-z grids in the resulting plot'),
]
KWARGS = [
Kwarg(name='axis', default=3),
Kwarg(name='max_points', default=0),
Kwarg(name='show_grids', default=False),
Kwarg(name='enabled', default=False),
Kwarg(name='axis', default=3),
Kwarg(name='max_points', default=0),
Kwarg(name='show_grids', default=False),
Kwarg(name='enabled', default=False),
]
# There isn't really any useful data to print to console. Disable header and result output.
# There isn't really any useful data to print to console. Disable header
# and result output.
HEADER = None
RESULT = None
......@@ -64,7 +67,8 @@ class Plotter(Module):
self.MAX_PLOT_POINTS = self.MAX_3D_PLOT_POINTS
self._generate_data_point = self._generate_3d_data_point
else:
raise Exception("Invalid Plotter axis specified: %d. Must be one of: [2,3]" % self.axis)
raise Exception(
"Invalid Plotter axis specified: %d. Must be one of: [2,3]" % self.axis)
if not self.max_points:
self.max_points = self.MAX_PLOT_POINTS
......@@ -106,7 +110,8 @@ class Plotter(Module):
# Go through every data point and how many times that point occurs
for (point, count) in iterator(data_points):
# For each data point, compare it to each remaining weight value
# For each data point, compare it to each remaining weight
# value
for w in get_keys(weightings):
# If the number of times this data point occurred is >= the weight value,
......@@ -119,18 +124,21 @@ class Plotter(Module):
else:
break
# Throw out weight values that exceed the maximum number of data points
# Throw out weight values that exceed the maximum number of
# data points
if weightings[w] > self.max_points:
del weightings[w]
# If there's only one weight value left, no sense in continuing the loop...
# If there's only one weight value left, no sense in continuing
# the loop...
if len(weightings) == 1:
break
# The least weighted value is our minimum weight
min_weight = min(weightings)
# Get rid of all data points that occur less frequently than our minimum weight
# Get rid of all data points that occur less frequently than our
# minimum weight
for point in get_keys(data_points):
if data_points[point] < min_weight:
del data_points[point]
......@@ -138,7 +146,8 @@ class Plotter(Module):
for point in sorted(data_points, key=data_points.get, reverse=True):
plot_points[point] = data_points[point]
# Register this as a result in case future modules need access to the raw point information,
# but mark plot as False to prevent the entropy module from attempting to overlay this data on its graph.
# but mark plot as False to prevent the entropy module from
# attempting to overlay this data on its graph.
self.result(point=point, plot=False)
total += 1
if total >= self.max_points:
......@@ -154,7 +163,7 @@ class Plotter(Module):
Returns a data point tuple.
'''
return (0,0,0)
return (0, 0, 0)
def _generate_data_points(self, fp):
'''
......@@ -178,8 +187,8 @@ class Plotter(Module):
break
i = 0
while (i+(self.axis-1)) < dlen:
point = self._generate_data_point(data[i:i+self.axis])
while (i + (self.axis - 1)) < dlen:
point = self._generate_data_point(data[i:i + self.axis])
if has_key(data_points, point):
data_points[point] += 1
else:
......@@ -208,7 +217,8 @@ class Plotter(Module):
frequency_percentage = (weight / nitems)
# Give points that occur more frequently a brighter color and larger point size.
# Frequency is determined as a percentage of total unique data points.
# Frequency is determined as a percentage of total unique data
# points.
if frequency_percentage > .010:
size[i] = .20
r = 1.0
......@@ -227,7 +237,8 @@ class Plotter(Module):
i += 1
scatter_plot = gl.GLScatterPlotItem(pos=pos, size=size, color=color, pxMode=False)
scatter_plot = gl.GLScatterPlotItem(
pos=pos, size=size, color=color, pxMode=False)
scatter_plot.translate(-127.5, -127.5, -127.5)
return scatter_plot
......@@ -258,12 +269,14 @@ class Plotter(Module):
for fd in iter(self.next_file, None):
data_points = self._generate_data_points(fd)
self._print("Generating plot points from %d data points" % len(data_points))
self._print("Generating plot points from %d data points" %
len(data_points))
self.plot_points = self._generate_plot_points(data_points)
del data_points
self._print("Generating graph from %d plot points" % len(self.plot_points))
self._print("Generating graph from %d plot points" %
len(self.plot_points))
self.window.addItem(self._generate_plot(self.plot_points))
......@@ -307,4 +320,3 @@ class Plotter(Module):
def run(self):
self.plot()
return True
# Module to process general user input options (scan length, starting offset, etc).
# Module to process general user input options (scan length, starting
# offset, etc).
import io
import os
......@@ -12,6 +13,7 @@ import binwalk.core.settings
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg, show_help
class General(Module):
TITLE = "General"
......@@ -23,77 +25,77 @@ class General(Module):
Option(long='length',
short='l',
type=int,
kwargs={'length' : 0},
kwargs={'length': 0},
description='Number of bytes to scan'),
Option(long='offset',
short='o',
type=int,
kwargs={'offset' : 0},
kwargs={'offset': 0},
description='Start scan at this file offset'),
Option(long='base',
short='O',
type=int,
kwargs={'base' : 0},
kwargs={'base': 0},
description='Add a base address to all printed offsets'),
Option(long='block',
short='K',
type=int,
kwargs={'block' : 0},
kwargs={'block': 0},
description='Set file block size'),
Option(long='swap',
short='g',
type=int,
kwargs={'swap_size' : 0},
kwargs={'swap_size': 0},
description='Reverse every n bytes before scanning'),
Option(long='log',
short='f',
type=argparse.FileType,
kwargs={'log_file' : None},
kwargs={'log_file': None},
description='Log results to file'),
Option(long='csv',
short='c',
kwargs={'csv' : True},
kwargs={'csv': True},
description='Log results to file in CSV format'),
Option(long='term',
short='t',
kwargs={'format_to_terminal' : True},
kwargs={'format_to_terminal': True},
description='Format output to fit the terminal window'),
Option(long='quiet',
short='q',
kwargs={'quiet' : True},
kwargs={'quiet': True},
description='Suppress output to stdout'),
Option(long='verbose',
short='v',
kwargs={'verbose' : True},
kwargs={'verbose': True},
description='Enable verbose output'),
Option(short='h',
long='help',
kwargs={'show_help' : True},
kwargs={'show_help': True},
description='Show help output'),
Option(short='a',
long='finclude',
type=str,
kwargs={'file_name_include_regex' : ""},
kwargs={'file_name_include_regex': ""},
description='Only scan files whose names match this regex'),
Option(short='p',
long='fexclude',
type=str,
kwargs={'file_name_exclude_regex' : ""},
kwargs={'file_name_exclude_regex': ""},
description='Do not scan files whose names match this regex'),
Option(short='s',
long='status',
type=int,
kwargs={'status_server_port' : 0},
kwargs={'status_server_port': 0},
description='Enable the status server on the specified port'),
Option(long=None,
short=None,
type=binwalk.core.common.BlockFile,
kwargs={'files' : []}),
kwargs={'files': []}),
# Hidden, API-only arguments
Option(long="string",
hidden=True,
kwargs={'subclass' : binwalk.core.common.StringFile}),
kwargs={'subclass': binwalk.core.common.StringFile}),
]
KWARGS = [
......@@ -132,9 +134,11 @@ class General(Module):
# Build file name filter regex rules
if self.file_name_include_regex:
self.file_name_include_regex = re.compile(self.file_name_include_regex)
self.file_name_include_regex = re.compile(
self.file_name_include_regex)
if self.file_name_exclude_regex:
self.file_name_exclude_regex = re.compile(self.file_name_exclude_regex)
self.file_name_exclude_regex = re.compile(
self.file_name_exclude_regex)
self.settings = binwalk.core.settings.Settings()
self.display = binwalk.core.display.Display(log=self.log_file,
......@@ -160,7 +164,8 @@ class General(Module):
Must be called after self._test_target_files so that self.target_files is properly set.
'''
# If more than one target file was specified, enable verbose mode; else, there is
# nothing in some outputs to indicate which scan corresponds to which file.
# nothing in some outputs to indicate which scan corresponds to which
# file.
if len(self.target_files) > 1 and not self.verbose:
self.verbose = True
......@@ -217,4 +222,3 @@ class General(Module):
raise e
except Exception as e:
self.error(description="Cannot open file : %s" % str(e))
# Routines to perform Chi Squared tests.
# Routines to perform Chi Squared tests.
# Used for fingerprinting unknown areas of high entropy (e.g., is this block of high entropy data compressed or encrypted?).
# Inspired by people who actually know what they're doing: http://www.fourmilab.ch/random/
# Inspired by people who actually know what they're doing:
# http://www.fourmilab.ch/random/
import math
from binwalk.core.compat import *
from binwalk.core.module import Module, Kwarg, Option, Dependency
class ChiSquare(object):
'''
Performs a Chi Squared test against the provided data.
'''
......@@ -20,12 +23,13 @@ class ChiSquare(object):
Returns None.
'''
self.bytes = {}
self.freedom = self.IDEAL - 1
# Initialize the self.bytes dictionary with keys for all possible byte values (0 - 255)
self.freedom = self.IDEAL - 1
# Initialize the self.bytes dictionary with keys for all possible byte
# values (0 - 255)
for i in range(0, int(self.IDEAL)):
self.bytes[chr(i)] = 0
self.reset()
def reset(self):
......@@ -33,7 +37,7 @@ class ChiSquare(object):
self.byte_count = 0
for key in self.bytes.keys():
self.bytes[key] = 0
self.bytes[key] = 0
def update(self, data):
'''
......@@ -59,20 +63,23 @@ class ChiSquare(object):
if expected:
for byte in self.bytes.values():
self.xc2 += ((byte - expected) ** 2 ) / expected
self.xc2 += ((byte - expected) ** 2) / expected
return self.xc2
class EntropyBlock(object):
def __init__(self, **kwargs):
self.start = None
self.end = None
self.length = None
for (k,v) in iterator(kwargs):
for (k, v) in iterator(kwargs):
setattr(self, k, v)
class HeuristicCompressionAnalyzer(Module):
'''
Performs analysis and attempts to interpret the results.
'''
......@@ -87,26 +94,27 @@ class HeuristicCompressionAnalyzer(Module):
TITLE = "Heuristic Compression"
DEPENDS = [
Dependency(name='Entropy',
attribute='entropy',
kwargs={'enabled' : True, 'do_plot' : False, 'display_results' : False, 'block_size' : ENTROPY_BLOCK_SIZE}),
Dependency(name='Entropy',
attribute='entropy',
kwargs={
'enabled': True, 'do_plot': False, 'display_results': False, 'block_size': ENTROPY_BLOCK_SIZE}),
]
CLI = [
Option(short='H',
long='heuristic',
kwargs={'enabled' : True},
description='Heuristically classify high entropy data'),
Option(short='a',
long='trigger',
kwargs={'trigger_level' : 0},
type=float,
description='Set the entropy trigger level (0.0 - 1.0, default: %.2f)' % ENTROPY_TRIGGER),
Option(short='H',
long='heuristic',
kwargs={'enabled': True},
description='Heuristically classify high entropy data'),
Option(short='a',
long='trigger',
kwargs={'trigger_level': 0},
type=float,
description='Set the entropy trigger level (0.0 - 1.0, default: %.2f)' % ENTROPY_TRIGGER),
]
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='trigger_level', default=ENTROPY_TRIGGER),
Kwarg(name='enabled', default=False),
Kwarg(name='trigger_level', default=ENTROPY_TRIGGER),
]
def init(self):
......@@ -130,17 +138,19 @@ class HeuristicCompressionAnalyzer(Module):
self.blocks[result.file.name] = []
if result.entropy >= self.trigger_level and (not self.blocks[result.file.name] or self.blocks[result.file.name][-1].end is not None):
self.blocks[result.file.name].append(EntropyBlock(start=result.offset + self.BLOCK_OFFSET))
self.blocks[result.file.name].append(
EntropyBlock(start=result.offset + self.BLOCK_OFFSET))
elif result.entropy < self.trigger_level and self.blocks[result.file.name] and self.blocks[result.file.name][-1].end is None:
self.blocks[result.file.name][-1].end = result.offset - self.BLOCK_OFFSET
self.blocks[result.file.name][
-1].end = result.offset - self.BLOCK_OFFSET
def run(self):
for fp in iter(self.next_file, None):
if has_key(self.blocks, fp.name):
self.header()
for block in self.blocks[fp.name]:
if block.end is None:
......@@ -173,7 +183,7 @@ class HeuristicCompressionAnalyzer(Module):
while j < dlen:
chi.reset()
data = d[j:j+self.block_size]
data = d[j:j + self.block_size]
if len(data) < self.block_size:
break
......@@ -181,7 +191,7 @@ class HeuristicCompressionAnalyzer(Module):
if chi.chisq() >= self.CHI_CUTOFF:
num_error += 1
j += self.block_size
if (j + i) > block.length:
......@@ -194,5 +204,6 @@ class HeuristicCompressionAnalyzer(Module):
else:
verdict = 'High entropy data, best guess: encrypted'
desc = '%s, size: %d, %d low entropy blocks' % (verdict, block.length, num_error)
desc = '%s, size: %d, %d low entropy blocks' % (
verdict, block.length, num_error)
self.result(offset=block.start, description=desc, file=fp)
......@@ -5,13 +5,13 @@ import binwalk.core.common as common
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg
class HexDiff(Module):
class HexDiff(Module):
COLORS = {
'red' : '31',
'green' : '32',
'blue' : '34',
'red': '31',
'green': '32',
'blue': '34',
}
SEPERATORS = ['\\', '/']
......@@ -23,34 +23,34 @@ class HexDiff(Module):
TITLE = "Binary Diffing"
CLI = [
Option(short='W',
long='hexdump',
kwargs={'enabled' : True},
description='Perform a hexdump / diff of a file or files'),
Option(short='G',
long='green',
kwargs={'show_green' : True},
description='Only show lines containing bytes that are the same among all files'),
Option(short='i',
long='red',
kwargs={'show_red' : True},
description='Only show lines containing bytes that are different among all files'),
Option(short='U',
long='blue',
kwargs={'show_blue' : True},
description='Only show lines containing bytes that are different among some files'),
Option(short='w',
long='terse',
kwargs={'terse' : True},
description='Diff all files, but only display a hex dump of the first file'),
Option(short='W',
long='hexdump',
kwargs={'enabled': True},
description='Perform a hexdump / diff of a file or files'),
Option(short='G',
long='green',
kwargs={'show_green': True},
description='Only show lines containing bytes that are the same among all files'),
Option(short='i',
long='red',
kwargs={'show_red': True},
description='Only show lines containing bytes that are different among all files'),
Option(short='U',
long='blue',
kwargs={'show_blue': True},
description='Only show lines containing bytes that are different among some files'),
Option(short='w',
long='terse',
kwargs={'terse': True},
description='Diff all files, but only display a hex dump of the first file'),
]
KWARGS = [
Kwarg(name='show_red', default=False),
Kwarg(name='show_blue', default=False),
Kwarg(name='show_green', default=False),
Kwarg(name='terse', default=False),
Kwarg(name='enabled', default=False),
Kwarg(name='show_red', default=False),
Kwarg(name='show_blue', default=False),
Kwarg(name='show_green', default=False),
Kwarg(name='terse', default=False),
Kwarg(name='enabled', default=False),
]
RESULT_FORMAT = "%s\n"
......@@ -98,7 +98,7 @@ class HexDiff(Module):
except IndexError as e:
diff_count += 1
if diff_count == len(target_data)-1:
if diff_count == len(target_data) - 1:
color = "red"
elif diff_count > 0:
color = "blue"
......@@ -149,7 +149,8 @@ class HexDiff(Module):
hexbyte = "XX"
asciibyte = "."
else:
(hexbyte, asciibyte) = self.hexascii(block_data, block_data[fp][i], i)
(hexbyte, asciibyte) = self.hexascii(
block_data, block_data[fp][i], i)
hexline += "%s " % hexbyte
asciiline += "%s" % asciibyte
......@@ -178,11 +179,13 @@ class HexDiff(Module):
self.status.completed += self.block
def init(self):
# To mimic expected behavior, if all options are False, we show everything
# To mimic expected behavior, if all options are False, we show
# everything
if not any([self.show_red, self.show_green, self.show_blue]):
self.show_red = self.show_green = self.show_blue = True
# Always disable terminal formatting, as it won't work properly with colorized output
# Always disable terminal formatting, as it won't work properly with
# colorized output
self.config.display.fit_to_screen = False
# Set the block size (aka, hexdump line size)
......@@ -205,7 +208,8 @@ class HexDiff(Module):
file_count = 1
else:
file_count = len(self.hex_target_files)
self.HEADER_FORMAT = "OFFSET " + (("%%-%ds " % header_width) * file_count) + "\n"
self.HEADER_FORMAT = "OFFSET " + \
(("%%-%ds " % header_width) * file_count) + "\n"
# Build the header argument list
self.HEADER = [fp.name for fp in self.hex_target_files]
......@@ -225,4 +229,3 @@ class HexDiff(Module):
self.header()
self.diff_files(self.hex_target_files)
self.footer()
# Basic signature scan module. This is the default (and primary) feature of binwalk.
# Basic signature scan module. This is the default (and primary) feature
# of binwalk.
import binwalk.core.magic
from binwalk.core.module import Module, Option, Kwarg
class Signature(Module):
TITLE = "Signature Scan"
ORDER = 10
CLI = [
Option(short='B',
long='signature',
kwargs={'enabled' : True, 'explicit_signature_scan' : True},
description='Scan target file(s) for common file signatures'),
Option(short='R',
long='raw',
kwargs={'enabled' : True, 'raw_bytes' : []},
type=list,
dtype=str.__name__,
description='Scan target file(s) for the specified sequence of bytes'),
Option(short='A',
long='opcodes',
kwargs={'enabled' : True, 'search_for_opcodes' : True},
description='Scan target file(s) for common executable opcode signatures'),
Option(short='m',
long='magic',
kwargs={'enabled' : True, 'magic_files' : []},
type=list,
dtype='file',
description='Specify a custom magic file to use'),
Option(short='b',
long='dumb',
kwargs={'dumb_scan' : True},
description='Disable smart signature keywords'),
Option(short='I',
long='invalid',
kwargs={'show_invalid' : True},
description='Show results marked as invalid'),
Option(short='x',
long='exclude',
kwargs={'exclude_filters' : []},
type=list,
dtype=str.__name__,
description='Exclude results that match <str>'),
Option(short='y',
long='include',
kwargs={'include_filters' : []},
type=list,
dtype=str.__name__,
description='Only show results that match <str>'),
Option(short='B',
long='signature',
kwargs={'enabled': True, 'explicit_signature_scan': True},
description='Scan target file(s) for common file signatures'),
Option(short='R',
long='raw',
kwargs={'enabled': True, 'raw_bytes': []},
type=list,
dtype=str.__name__,
description='Scan target file(s) for the specified sequence of bytes'),
Option(short='A',
long='opcodes',
kwargs={'enabled': True, 'search_for_opcodes': True},
description='Scan target file(s) for common executable opcode signatures'),
Option(short='m',
long='magic',
kwargs={'enabled': True, 'magic_files': []},
type=list,
dtype='file',
description='Specify a custom magic file to use'),
Option(short='b',
long='dumb',
kwargs={'dumb_scan': True},
description='Disable smart signature keywords'),
Option(short='I',
long='invalid',
kwargs={'show_invalid': True},
description='Show results marked as invalid'),
Option(short='x',
long='exclude',
kwargs={'exclude_filters': []},
type=list,
dtype=str.__name__,
description='Exclude results that match <str>'),
Option(short='y',
long='include',
kwargs={'include_filters': []},
type=list,
dtype=str.__name__,
description='Only show results that match <str>'),
]
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='show_invalid', default=False),
Kwarg(name='include_filters', default=[]),
Kwarg(name='exclude_filters', default=[]),
Kwarg(name='raw_bytes', default=[]),
Kwarg(name='search_for_opcodes', default=False),
Kwarg(name='explicit_signature_scan', default=False),
Kwarg(name='dumb_scan', default=False),
Kwarg(name='magic_files', default=[]),
Kwarg(name='enabled', default=False),
Kwarg(name='show_invalid', default=False),
Kwarg(name='include_filters', default=[]),
Kwarg(name='exclude_filters', default=[]),
Kwarg(name='raw_bytes', default=[]),
Kwarg(name='search_for_opcodes', default=False),
Kwarg(name='explicit_signature_scan', default=False),
Kwarg(name='dumb_scan', default=False),
Kwarg(name='magic_files', default=[]),
]
VERBOSE_FORMAT = "%s %d"
......@@ -67,16 +69,19 @@ class Signature(Module):
def init(self):
self.one_of_many = None
# Append the user's magic file first so that those signatures take precedence
# Append the user's magic file first so that those signatures take
# precedence
if self.search_for_opcodes:
self.magic_files = [
self.config.settings.user.binarch,
self.config.settings.system.binarch,
self.config.settings.user.binarch,
self.config.settings.system.binarch,
]
# Use the system default magic file if no other was specified, or if -B was explicitly specified
# Use the system default magic file if no other was specified, or if -B
# was explicitly specified
if (not self.magic_files and not self.raw_bytes) or self.explicit_signature_scan:
self.magic_files += self.config.settings.user.magic + self.config.settings.system.magic
self.magic_files += self.config.settings.user.magic + \
self.config.settings.system.magic
# Initialize libmagic
self.magic = binwalk.core.magic.Magic(include=self.include_filters,
......@@ -87,13 +92,16 @@ class Signature(Module):
if self.raw_bytes:
raw_signatures = []
for raw_bytes in self.raw_bytes:
raw_signatures.append("0 string %s %s" % (raw_bytes, raw_bytes))
binwalk.core.common.debug("Parsing raw signatures: %s" % str(raw_signatures))
raw_signatures.append(
"0 string %s %s" % (raw_bytes, raw_bytes))
binwalk.core.common.debug(
"Parsing raw signatures: %s" % str(raw_signatures))
self.magic.parse(raw_signatures)
# Parse the magic file(s)
if self.magic_files:
binwalk.core.common.debug("Loading magic files: %s" % str(self.magic_files))
binwalk.core.common.debug(
"Loading magic files: %s" % str(self.magic_files))
for f in self.magic_files:
self.magic.load(f)
......@@ -116,7 +124,8 @@ class Signature(Module):
r.valid = False
if r.valid:
# Don't keep displaying signatures that repeat a bunch of times (e.g., JFFS2 nodes)
# Don't keep displaying signatures that repeat a bunch of times
# (e.g., JFFS2 nodes)
if r.id == self.one_of_many:
r.display = False
elif r.many:
......@@ -156,14 +165,17 @@ class Signature(Module):
r.file = fp
# Register the result for futher processing/display
# self.result automatically calls self.validate for result validation
# self.result automatically calls self.validate for result
# validation
self.result(r=r)
# Is this a valid result and did it specify a jump-to-offset keyword, and are we doing a "smart" scan?
# Is this a valid result and did it specify a jump-to-offset
# keyword, and are we doing a "smart" scan?
if r.valid and r.jump > 0 and not self.dumb_scan:
absolute_jump_offset = r.offset + r.jump
current_block_offset = relative_offset + r.jump
#print ("Jumping to: 0x%X (0x%X)..." % (absolute_jump_offset, current_block_offset))
# print ("Jumping to: 0x%X (0x%X)..." %
# (absolute_jump_offset, current_block_offset))
# If the jump-to-offset is beyond the confines of the current block, seek the file to
# that offset and quit processing this block of data.
......@@ -176,4 +188,3 @@ class Signature(Module):
self.header()
self.scan_file(fp)
self.footer()
......@@ -2,38 +2,42 @@ import os
import binwalk.core.common
import binwalk.core.plugin
class ArcadyanDeobfuscator(binwalk.core.plugin.Plugin):
'''
Deobfuscator for known Arcadyan firmware obfuscation(s).
'''
MODULES = ['Signature']
OBFUSCATION_MAGIC_SIZE = 4
MAX_IMAGE_SIZE = 0x1B0000
BLOCK_SIZE = 32
BLOCK1_OFFSET = 4
BLOCK2_OFFSET = 0x68
MIN_FILE_SIZE = (OBFUSCATION_MAGIC_SIZE + BLOCK2_OFFSET + BLOCK_SIZE)
OBFUSCATION_MAGIC_SIZE = 4
MAX_IMAGE_SIZE = 0x1B0000
BLOCK_SIZE = 32
BLOCK1_OFFSET = 4
BLOCK2_OFFSET = 0x68
MIN_FILE_SIZE = (
OBFUSCATION_MAGIC_SIZE + BLOCK2_OFFSET + BLOCK_SIZE)
BLOCK1_START = BLOCK1_OFFSET
BLOCK1_END = BLOCK1_START + BLOCK_SIZE
BLOCK1_START = BLOCK1_OFFSET
BLOCK1_END = BLOCK1_START + BLOCK_SIZE
BLOCK2_START = BLOCK2_OFFSET
BLOCK2_END = BLOCK2_OFFSET + BLOCK_SIZE
BLOCK2_START = BLOCK2_OFFSET
BLOCK2_END = BLOCK2_OFFSET + BLOCK_SIZE
P1_START = 0
P1_END = BLOCK1_OFFSET
P1_START = 0
P1_END = BLOCK1_OFFSET
P2_START = BLOCK1_END
P2_END = BLOCK2_START
P2_START = BLOCK1_END
P2_END = BLOCK2_START
P3_START = BLOCK2_END
P3_START = BLOCK2_END
def init(self):
if self.module.extractor.enabled:
self.module.extractor.add_rule(regex="^obfuscated arcadyan firmware",
extension="obfuscated",
cmd=self.extractor)
self.module.extractor.add_rule(
regex="^obfuscated arcadyan firmware",
extension="obfuscated",
cmd=self.extractor)
def extractor(self, fname):
deobfuscated = None
......@@ -55,22 +59,25 @@ class ArcadyanDeobfuscator(binwalk.core.plugin.Plugin):
# Nibble-swap each byte in block 1
nswap = ''
for i in range(self.BLOCK1_START, self.BLOCK1_END):
nswap += chr(((ord(deobfuscated[i]) & 0x0F) << 4) + ((ord(deobfuscated[i]) & 0xF0) >> 4));
deobfuscated = deobfuscated[self.P1_START:self.P1_END] + nswap + deobfuscated[self.BLOCK1_END:]
nswap += chr(((ord(deobfuscated[i]) & 0x0F) << 4) + (
(ord(deobfuscated[i]) & 0xF0) >> 4))
deobfuscated = deobfuscated[
self.P1_START:self.P1_END] + nswap + deobfuscated[self.BLOCK1_END:]
# Byte-swap each byte pair in block 1
bswap = ''
i = self.BLOCK1_START
while i < self.BLOCK1_END:
bswap += deobfuscated[i+1] + deobfuscated[i]
bswap += deobfuscated[i + 1] + deobfuscated[i]
i += 2
deobfuscated = deobfuscated[self.P1_START:self.P1_END] + bswap + deobfuscated[self.BLOCK1_END:]
deobfuscated = deobfuscated[
self.P1_START:self.P1_END] + bswap + deobfuscated[self.BLOCK1_END:]
if deobfuscated:
out = binwalk.core.common.BlockFile((os.path.splitext(fname)[0] + '.deobfuscated'), "wb")
out = binwalk.core.common.BlockFile(
(os.path.splitext(fname)[0] + '.deobfuscated'), "wb")
out.write(deobfuscated)
out.close()
return True
else:
return False
#import binwalk.core.C
# import binwalk.core.C
import binwalk.core.plugin
#from binwalk.core.common import *
# from binwalk.core.common import *
class CompressdPlugin(binwalk.core.plugin.Plugin):
# '''
......@@ -9,25 +10,25 @@ class CompressdPlugin(binwalk.core.plugin.Plugin):
MODULES = ['Signature']
#READ_SIZE = 64
# READ_SIZE = 64
#COMPRESS42 = "compress42"
#COMPRESS42_FUNCTIONS = [
# COMPRESS42 = "compress42"
# COMPRESS42_FUNCTIONS = [
# binwalk.core.C.Function(name="is_compressed", type=bool),
#]
#comp = None
# comp = None
#def init(self):
#self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
# def init(self):
# self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
# This plugin is currently disabled due to the need to move away from supporting C
# libraries and into a pure Python project, for cross-platform support and ease of
# installation / package maintenance. A Python implementation will likely need to
# be custom developed in the future, but for now, since this compression format is
# not very common, especially in firmware, simply disable it.
#self.comp = None
# self.comp = None
#def scan(self, result):
# def scan(self, result):
# if self.comp and result.file and result.description.lower().startswith("compress'd data"):
# fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.READ_SIZE)
# compressed_data = fd.read(self.READ_SIZE)
......@@ -35,5 +36,3 @@ class CompressdPlugin(binwalk.core.plugin.Plugin):
# if not self.comp.is_compressed(compressed_data, len(compressed_data)):
# result.valid = False
......@@ -2,7 +2,9 @@ import os
import subprocess
import binwalk.core.plugin
class CPIOPlugin(binwalk.core.plugin.Plugin):
'''
Ensures that ASCII CPIO archive entries only get extracted once.
Also provides an internal CPIO extraction wrapper around the Unix
......@@ -40,10 +42,11 @@ class CPIOPlugin(binwalk.core.plugin.Plugin):
return
try:
result = subprocess.call(['cpio', '-d', '-i', '--no-absolute-filenames'],
stdin=fpin,
stderr=fperr,
stdout=fperr)
result = subprocess.call(
['cpio', '-d', '-i', '--no-absolute-filenames'],
stdin=fpin,
stderr=fperr,
stdout=fperr)
except OSError:
result = -1
......@@ -70,7 +73,8 @@ class CPIOPlugin(binwalk.core.plugin.Plugin):
def _get_file_name_length(self, description):
length = 0
if 'file name length: "' in description:
length_string = description.split('file name length: "')[1].split('"')[0]
length_string = description.split(
'file name length: "')[1].split('"')[0]
length = int(length_string, 0)
return length
......@@ -78,12 +82,14 @@ class CPIOPlugin(binwalk.core.plugin.Plugin):
if result.valid:
# ASCII CPIO archives consist of multiple entries, ending with an entry named 'TRAILER!!!'.
# Displaying each entry is useful, as it shows what files are contained in the archive,
# but we only want to extract the archive when the first entry is found.
# but we only want to extract the archive when the first entry is
# found.
if result.description.startswith('ASCII cpio archive'):
# Validate the reported name length
file_name = self._get_file_name(result.description)
file_name_length = self._get_file_name_length(result.description)
file_name_length = self._get_file_name_length(
result.description)
if len(file_name) != file_name_length:
result.valid = False
return
......@@ -91,7 +97,8 @@ class CPIOPlugin(binwalk.core.plugin.Plugin):
self.consecutive_hits += 1
if not self.found_archive or self.found_archive_in_file != result.file.name:
# This is the first entry. Set found_archive and allow the scan to continue normally.
# This is the first entry. Set found_archive and allow the
# scan to continue normally.
self.found_archive_in_file = result.file.name
self.found_archive = True
result.extract = True
......@@ -113,5 +120,6 @@ class CPIOPlugin(binwalk.core.plugin.Plugin):
self.consecutive_hits = 0
elif self.consecutive_hits >= 4:
# Ignore other stuff until the end of CPIO is found
# TODO: It would be better to jump to the end of this CPIO entry rather than make this assumption...
# TODO: It would be better to jump to the end of this CPIO
# entry rather than make this assumption...
result.valid = False
......@@ -7,21 +7,23 @@ try:
except ImportError as e:
pass
class RomFSCommon(object):
def _read_next_word(self):
value = struct.unpack("%sL" % self.endianess, self.data[self.index:self.index+4])[0]
value = struct.unpack(
"%sL" % self.endianess, self.data[self.index:self.index + 4])[0]
self.index += 4
return value
def _read_next_uid(self):
uid = int(self.data[self.index:self.index+4])
uid = int(self.data[self.index:self.index + 4])
self.index += 4
return uid
def _read_next_block(self, size):
size = int(size)
data = self.data[self.index:self.index+size]
data = self.data[self.index:self.index + size]
self.index += size
return data
......@@ -41,10 +43,11 @@ class RomFSCommon(object):
self.index += 1
return data
class RomFSEntry(RomFSCommon):
DIR_STRUCT_MASK = 0x00000001
DATA_MASK = 0x00000008
DATA_MASK = 0x00000008
COMPRESSED_MASK = 0x005B0000
def __init__(self, data, endianess="<"):
......@@ -61,6 +64,7 @@ class RomFSEntry(RomFSCommon):
self.unknown5 = self._read_next_word()
self.uid = self._read_next_uid()
class RomFSDirStruct(RomFSCommon):
SIZE = 0x20
......@@ -94,17 +98,20 @@ class RomFSDirStruct(RomFSCommon):
if count == 0:
mod = self.SIZE - total_size
else:
mod = self.SIZE - int(total_size - (count*self.SIZE))
mod = self.SIZE - int(total_size - (count * self.SIZE))
if mod > 0:
remainder = self._read_next_block(mod)
yield (uid, entry)
class FileContainer(object):
def __init__(self):
pass
class RomFS(object):
SUPERBLOCK_SIZE = 0x20
......@@ -145,7 +152,8 @@ class RomFS(object):
while True:
try:
entry = RomFSEntry(self.data[offset:offset+self.FILE_ENTRY_SIZE], endianess=self.endianess)
entry = RomFSEntry(
self.data[offset:offset + self.FILE_ENTRY_SIZE], endianess=self.endianess)
except ValueError as e:
break
......@@ -160,7 +168,8 @@ class RomFS(object):
if entry.type & entry.DIR_STRUCT_MASK:
entries[entry.uid].type = "directory"
ds = RomFSDirStruct(self.data[entry.offset:entry.offset+entry.size], endianess=self.endianess)
ds = RomFSDirStruct(
self.data[entry.offset:entry.offset + entry.size], endianess=self.endianess)
for (uid, name) in ds.ls:
if not uid in entries:
entries[uid] = FileContainer()
......@@ -184,7 +193,9 @@ if __name__ == '__main__':
print ("Usage: %s <input file> <output directory>" % sys.argv[0])
sys.exit(1)
class DlinkROMFSExtractPlugin(binwalk.core.plugin.Plugin):
'''
Gzip extractor plugin.
'''
......@@ -193,7 +204,8 @@ class DlinkROMFSExtractPlugin(binwalk.core.plugin.Plugin):
def init(self):
# If the extractor is enabled for the module we're currently loaded
# into, then register self.extractor as a D-Link ROMFS file system extraction rule.
# into, then register self.extractor as a D-Link ROMFS file system
# extraction rule.
if self.module.extractor.enabled:
self.module.extractor.add_rule(txtrule=None,
regex="^d-link romfs filesystem",
......
......@@ -2,7 +2,9 @@ import os
import gzip
import binwalk.core.plugin
class GzipExtractPlugin(binwalk.core.plugin.Plugin):
'''
Gzip extractor plugin.
'''
......
......@@ -3,7 +3,9 @@ import binwalk.core.compat
import binwalk.core.plugin
from binwalk.core.common import BlockFile
class GzipValidPlugin(binwalk.core.plugin.Plugin):
'''
Validates gzip compressed data. Almost identical to zlibvalid.py.
'''
......@@ -15,7 +17,8 @@ class GzipValidPlugin(binwalk.core.plugin.Plugin):
# If this result is a gzip signature match, try to decompress the data
if result.file and result.description.lower().startswith('gzip'):
# Seek to and read the suspected gzip data
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
fd = self.module.config.open_file(
result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
......@@ -40,8 +43,7 @@ class GzipValidPlugin(binwalk.core.plugin.Plugin):
except zlib.error as e:
error = str(e)
# Truncated input data results in error -5.
# gzip uses different checksums than zlib, which results in error -3.
# gzip uses different checksums than zlib, which results in
# error -3.
if not error.startswith("Error -5") and not error.startswith("Error -3"):
result.valid = False
......@@ -13,6 +13,7 @@ except ImportError as e:
class HilinkDecryptor(binwalk.core.plugin.Plugin):
'''
Plugin to decrypt, validate, and extract Hilink encrypted firmware.
'''
......@@ -28,11 +29,12 @@ class HilinkDecryptor(binwalk.core.plugin.Plugin):
self.enabled = True
if self.enabled is True and self.module.extractor.enabled is True:
# Add an extraction rule for encrypted Hilink firmware signature results
self.module.extractor.add_rule(regex="^%s" % self.SIGNATURE_DESCRIPTION,
extension="enc",
cmd=self._decrypt_and_extract)
# Add an extraction rule for encrypted Hilink firmware signature
# results
self.module.extractor.add_rule(
regex="^%s" % self.SIGNATURE_DESCRIPTION,
extension="enc",
cmd=self._decrypt_and_extract)
def _decrypt_and_extract(self, fname):
'''
......@@ -68,25 +70,31 @@ class HilinkDecryptor(binwalk.core.plugin.Plugin):
if self.enabled is True:
if result.valid is True:
if result.description.lower().startswith(self.SIGNATURE_DESCRIPTION) is True:
# Read in the first 64 bytes of the suspected encrypted uImage header
fd = self.module.config.open_file(result.file.name, offset=result.offset)
encrypted_header_data = binwalk.core.compat.str2bytes(fd.read(64))
# Read in the first 64 bytes of the suspected encrypted
# uImage header
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
encrypted_header_data = binwalk.core.compat.str2bytes(
fd.read(64))
fd.close()
# Decrypt the header
decrypted_header_data = self._hilink_decrypt(encrypted_header_data)
decrypted_header_data = self._hilink_decrypt(
encrypted_header_data)
# Pull out the image size and image name fields from the decrypted uImage header
# and add them to the printed description.
result.size = struct.unpack(b">L", decrypted_header_data[12:16])[0]
result.size = struct.unpack(
b">L", decrypted_header_data[12:16])[0]
result.description += ", size: %d" % (result.size)
# NOTE: The description field should be 32 bytes? Hilink seems to use only 24 bytes for this field,
# even though the header size is still 64 bytes?
result.description += ', image name: "%s"' % binwalk.core.compat.bytes2str(decrypted_header_data[32:56]).strip("\x00")
result.description += ', image name: "%s"' % binwalk.core.compat.bytes2str(
decrypted_header_data[32:56]).strip("\x00")
# Do some basic validation on the decrypted size and image name fields
# Do some basic validation on the decrypted size and image
# name fields
if result.size > (result.file.size - result.offset):
result.valid = False
if not all(c in string.printable for c in result.description):
result.valid = False
......@@ -2,7 +2,9 @@ import struct
import binascii
import binwalk.core.plugin
class JFFS2ValidPlugin(binwalk.core.plugin.Plugin):
'''
Helps validate JFFS2 signature results.
......@@ -23,7 +25,8 @@ class JFFS2ValidPlugin(binwalk.core.plugin.Plugin):
header_crc = struct.unpack("<I", node_header[8:12])[0]
# Calculate the actual CRC
calculated_header_crc = (binascii.crc32(node_header[0:8], -1) ^ -1) & 0xffffffff
calculated_header_crc = (
binascii.crc32(node_header[0:8], -1) ^ -1) & 0xffffffff
# Make sure they match
return (header_crc == calculated_header_crc)
......@@ -32,16 +35,15 @@ class JFFS2ValidPlugin(binwalk.core.plugin.Plugin):
if result.file and result.description.lower().startswith('jffs2 filesystem'):
# Seek to and read the suspected JFFS2 node header
fd = self.module.config.open_file(result.file.name, offset=result.offset)
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
# JFFS2 headers are only 12 bytes in size, but reading larger amounts of
# data from disk speeds up repeated disk access and decreases performance
# hits (disk caching?).
#
# TODO: Should this plugin validate the *entire* JFFS2 file system, rather
# than letting the signature module find every single JFFS2 node?
# than letting the signature module find every single JFFS2 node?
node_header = fd.read(1024)
fd.close()
result.valid = self._check_crc(node_header[0:12])
import os
import binwalk.core.plugin
class LZMAExtractPlugin(binwalk.core.plugin.Plugin):
'''
LZMA extractor plugin.
'''
......@@ -11,7 +13,8 @@ class LZMAExtractPlugin(binwalk.core.plugin.Plugin):
try:
# lzma package in Python 2.0 decompress() does not handle multiple
# compressed streams, only first stream is extracted.
# backports.lzma package could be used to keep consistent behaviour.
# backports.lzma package could be used to keep consistent
# behaviour.
try:
import lzma
except ImportError:
......
......@@ -4,7 +4,9 @@ import binwalk.core.plugin
from binwalk.core.compat import *
from binwalk.core.common import BlockFile
class LZMAModPlugin(binwalk.core.plugin.Plugin):
'''
Finds and extracts modified LZMA files commonly found in cable modems.
Based on Bernardo Rodrigues' work: http://w00tsec.blogspot.com/2013/11/unpacking-firmware-images-from-cable.html
......@@ -27,11 +29,14 @@ class LZMAModPlugin(binwalk.core.plugin.Plugin):
# Try extracting the LZMA file without modification first
result = self.module.extractor.execute(self.original_cmd, fname)
# If the external extractor was successul (True) or didn't exist (None), don't do anything.
# If the external extractor was successul (True) or didn't exist
# (None), don't do anything.
if result not in [True, None]:
out_name = os.path.splitext(fname)[0] + '-patched' + os.path.splitext(fname)[1]
out_name = os.path.splitext(fname)[
0] + '-patched' + os.path.splitext(fname)[1]
fp_out = BlockFile(out_name, 'w')
# Use self.module.config.open_file here to ensure that other config settings (such as byte-swapping) are honored
# Use self.module.config.open_file here to ensure that other config
# settings (such as byte-swapping) are honored
fp_in = self.module.config.open_file(fname, offset=0, length=0)
fp_in.set_block_size(peek=0)
i = 0
......@@ -51,16 +56,18 @@ class LZMAModPlugin(binwalk.core.plugin.Plugin):
fp_in.close()
fp_out.close()
# Overwrite the original file so that it can be cleaned up if -r was specified
# Overwrite the original file so that it can be cleaned up if -r
# was specified
shutil.move(out_name, fname)
result = self.module.extractor.execute(self.original_cmd, fname)
return result
def scan(self, result):
# The modified cable modem LZMA headers all have valid dictionary sizes and a properties byte of 0x5D.
# The modified cable modem LZMA headers all have valid dictionary sizes
# and a properties byte of 0x5D.
if result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if "properties: 0x5D" in result.description and "invalid dictionary size" not in result.description:
result.valid = True
result.description = result.description.split("invalid uncompressed size")[0] + "missing uncompressed size"
result.description = result.description.split(
"invalid uncompressed size")[0] + "missing uncompressed size"
......@@ -2,7 +2,9 @@ import binwalk.core.plugin
import binwalk.core.compat
from binwalk.core.common import BlockFile
class LZMAPlugin(binwalk.core.plugin.Plugin):
'''
Validates lzma signature results.
'''
......@@ -29,7 +31,8 @@ class LZMAPlugin(binwalk.core.plugin.Plugin):
valid = True
if self.decompressor is not None:
# The only acceptable exceptions are those indicating that the input data was truncated.
# The only acceptable exceptions are those indicating that the
# input data was truncated.
try:
self.decompressor(binwalk.core.compat.str2bytes(data))
except IOError as e:
......@@ -49,7 +52,8 @@ class LZMAPlugin(binwalk.core.plugin.Plugin):
if result.valid and result.file and result.description.lower().startswith('lzma compressed data'):
# Seek to and read the suspected lzma data
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
fd = self.module.config.open_file(
result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
......@@ -59,4 +63,3 @@ class LZMAPlugin(binwalk.core.plugin.Plugin):
data = data[:5] + self.FAKE_LZMA_SIZE + data[5:]
if not self.is_valid_lzma(data):
result.valid = False
......@@ -2,6 +2,7 @@ import time
import math
import binwalk.core.plugin
class TarPlugin(binwalk.core.plugin.Plugin):
MODULES = ['Signature']
......@@ -41,24 +42,27 @@ class TarPlugin(binwalk.core.plugin.Plugin):
if result.description.lower().startswith('posix tar archive'):
is_tar = True
file_offset = result.offset
fd = self.module.config.open_file(result.file.name, offset=result.offset)
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
while is_tar:
# read in the tar header struct
buf = fd.read(self.TAR_BLOCKSIZE)
# check to see if we are still in a tarball
if buf[257:262] == 'ustar':
# get size of tarred file convert to blocks (plus 1 to include header)
# get size of tarred file convert to blocks (plus 1 to
# include header)
try:
size = self.nti(buf[124:136])
blocks = math.ceil(size/float(self.TAR_BLOCKSIZE)) + 1
blocks = math.ceil(
size / float(self.TAR_BLOCKSIZE)) + 1
except ValueError as e:
is_tar = False
break
# update file offset for next file in tarball
file_offset += int(self.TAR_BLOCKSIZE*blocks)
file_offset += int(self.TAR_BLOCKSIZE * blocks)
if file_offset >= result.file.size:
# we hit the end of the file
......@@ -66,6 +70,6 @@ class TarPlugin(binwalk.core.plugin.Plugin):
else:
fd.seek(file_offset)
else:
is_tar = False
is_tar = False
result.jump = file_offset
......@@ -3,14 +3,16 @@ import binascii
import binwalk.core.plugin
import binwalk.core.compat
class UBIValidPlugin(binwalk.core.plugin.Plugin):
'''
Helps validate UBI erase count signature results.
Checks header CRC and calculates jump value
'''
MODULES = ['Signature']
current_file=None
current_file = None
last_ec_hdr_offset = None
peb_size = None
......@@ -26,15 +28,15 @@ class UBIValidPlugin(binwalk.core.plugin.Plugin):
def _process_result(self, result):
if self.current_file == result.file.name:
result.display=False
result.display = False
else:
# Reset everything in case new file is encountered
self.peb_size=None
self.last_ec_hdr_offset=None
self.peb_size=None
self.peb_size = None
self.last_ec_hdr_offset = None
self.peb_size = None
# Display result and trigger extraction
result.display=True
result.display = True
self.current_file = result.file.name
......@@ -54,7 +56,8 @@ class UBIValidPlugin(binwalk.core.plugin.Plugin):
def scan(self, result):
if result.file and result.description.lower().startswith('ubi erase count header'):
# Seek to and read the suspected UBI erase count header
fd = self.module.config.open_file(result.file.name, offset=result.offset)
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
ec_header = binwalk.core.compat.str2bytes(fd.read(1024))
fd.close()
......
......@@ -4,5 +4,6 @@
import binwalk.core.plugin
class Unjffs2DepreciatedPlugin(binwalk.core.plugin.Plugin):
pass
import binwalk.core.plugin
class ZipHelperPlugin(binwalk.core.plugin.Plugin):
'''
A helper plugin for Zip files to ensure that the Zip archive
extraction rule is only executed once when the first Zip archive
......
......@@ -4,7 +4,9 @@ import binwalk.core.compat
import binwalk.core.common
import binwalk.core.plugin
class ZLIBExtractPlugin(binwalk.core.plugin.Plugin):
'''
Zlib extractor plugin.
'''
......@@ -26,7 +28,8 @@ class ZLIBExtractPlugin(binwalk.core.plugin.Plugin):
fpin = binwalk.core.common.BlockFile(fname)
fpout = binwalk.core.common.BlockFile(outfile, 'w')
plaintext = zlib.decompress(binwalk.core.compat.str2bytes(fpin.read()))
plaintext = zlib.decompress(
binwalk.core.compat.str2bytes(fpin.read()))
fpout.write(plaintext)
fpin.close()
......@@ -37,4 +40,3 @@ class ZLIBExtractPlugin(binwalk.core.plugin.Plugin):
return False
return True
......@@ -3,7 +3,9 @@ import binwalk.core.compat
import binwalk.core.plugin
from binwalk.core.common import BlockFile
class ZlibValidPlugin(binwalk.core.plugin.Plugin):
'''
Validates zlib compressed data.
'''
......@@ -40,4 +42,3 @@ class ZlibValidPlugin(binwalk.core.plugin.Plugin):
# Error -5, incomplete or truncated data input
if not str(e).startswith("Error -5"):
result.valid = False
......@@ -2,6 +2,7 @@ import idc
import idaapi
import binwalk
class binwalk_t(idaapi.plugin_t):
flags = 0
comment = "Scan the current IDB for file signatures"
......@@ -10,8 +11,10 @@ class binwalk_t(idaapi.plugin_t):
wanted_hotkey = ""
def init(self):
self.menu_context_1 = idaapi.add_menu_item("Search/", "binwalk opcodes", "", 0, self.opcode_scan, (None,))
self.menu_context_2 = idaapi.add_menu_item("Search/", "binwalk signatures", "", 0, self.signature_scan, (None,))
self.menu_context_1 = idaapi.add_menu_item(
"Search/", "binwalk opcodes", "", 0, self.opcode_scan, (None,))
self.menu_context_2 = idaapi.add_menu_item(
"Search/", "binwalk signatures", "", 0, self.signature_scan, (None,))
return idaapi.PLUGIN_KEEP
def term(self):
......@@ -28,6 +31,6 @@ class binwalk_t(idaapi.plugin_t):
def opcode_scan(self, arg):
binwalk.scan(idc.GetIdbPath(), opcode=True)
def PLUGIN_ENTRY():
return binwalk_t()
......@@ -9,7 +9,8 @@ for module in binwalk.scan(*sys.argv[1:], signature=True, quiet=True, extract=Tr
for result in module.results:
if module.extractor.output.has_key(result.file.path):
if module.extractor.output[result.file.path].extracted.has_key(result.offset):
print ("Extracted '%s' at offset 0x%X from '%s' to '%s'" % (result.description.split(',')[0],
result.offset,
result.file.path,
str(module.extractor.output[result.file.path].extracted[result.offset])))
print (
"Extracted '%s' at offset 0x%X from '%s' to '%s'" % (result.description.split(',')[0],
result.offset,
result.file.path,
str(module.extractor.output[result.file.path].extracted[result.offset])))
......@@ -4,10 +4,12 @@ import sys
import binwalk
try:
# Perform a signature scan against the files specified on the command line and suppress the usual binwalk output.
# Perform a signature scan against the files specified on the command line
# and suppress the usual binwalk output.
for module in binwalk.scan(*sys.argv[1:], signature=True, quiet=True):
print ("%s Results:" % module.name)
for result in module.results:
print ("\t%s 0x%.8X %s [%s]" % (result.file.name, result.offset, result.description, str(result.valid)))
print ("\t%s 0x%.8X %s [%s]" % (
result.file.name, result.offset, result.description, str(result.valid)))
except binwalk.ModuleException as e:
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment