Commit 51f2dc48 by Craig Heffner

Code formatting, removal of unused code.

No related merge requests found
import os
import sys
import glob
import ctypes
import ctypes.util
import binwalk.core.common
from binwalk.core.compat import *
class Function(object):
'''
Container class for defining library functions.
'''
def __init__(self, **kwargs):
self.name = None
self.type = int
for (k, v) in iterator(kwargs):
setattr(self, k, v)
class FunctionHandler(object):
'''
Class for abstracting function calls via ctypes and handling Python 2/3 compatibility issues.
'''
PY2CTYPES = {
bytes: ctypes.c_char_p,
str: ctypes.c_char_p,
int: ctypes.c_int,
float: ctypes.c_float,
bool: ctypes.c_int,
None: ctypes.c_int,
}
RETVAL_CONVERTERS = {
None: int,
int: int,
float: float,
bool: bool,
str: bytes2str,
bytes: str2bytes,
}
def __init__(self, library, function):
'''
Class constructor.
@library - Library handle as returned by ctypes.cdll.LoadLibrary.
@function - An instance of the binwalk.core.C.Function class.
Returns None.
'''
self.name = function.name
self.retype = function.type
self.function = getattr(library, self.name)
if has_key(self.PY2CTYPES, self.retype):
self.function.restype = self.PY2CTYPES[self.retype]
self.retval_converter = self.RETVAL_CONVERTERS[self.retype]
else:
self.function.restype = self.retype
self.retval_converter = None
# raise Exception("Unknown return type: '%s'" % self.retype)
def run(self, *args):
'''
Executes the library function, handling Python 2/3 compatibility and properly converting the return type.
@*args - Library function arguments.
Returns the return value from the libraray function.
'''
args = list(args)
# Python3 expects a bytes object for char *'s, not a str.
# This allows us to pass either, regardless of the Python version.
for i in range(0, len(args)):
if isinstance(args[i], str):
args[i] = str2bytes(args[i])
retval = self.function(*args)
if self.retval_converter is not None:
retval = self.retval_converter(retval)
return retval
class Library(object):
'''
Class for loading the specified library via ctypes.
'''
def __init__(self, library, functions):
'''
Class constructor.
@library - Library name (e.g., 'magic' for libmagic), or a list of names.
@functions - A dictionary of function names and their return types (e.g., {'magic_buffer' : str})
Returns None.
'''
self.settings = binwalk.core.settings.Settings()
self.library = ctypes.cdll.LoadLibrary(self.find_library(library))
if not self.library:
raise Exception("Failed to load library '%s'" % library)
for function in functions:
f = FunctionHandler(self.library, function)
setattr(self, function.name, f.run)
def find_library(self, libraries):
'''
Locates the specified library.
@libraries - Library name (e.g., 'magic' for libmagic), or a list of names.
Returns a string to be passed to ctypes.cdll.LoadLibrary.
'''
lib_path = None
prefix = binwalk.core.common.get_libs_path()
if isinstance(libraries, str):
libraries = [libraries]
for library in libraries:
system_paths = {
'linux': [os.path.join(prefix, 'lib%s.so' % library), '/usr/local/lib/lib%s.so' % library],
'cygwin': [os.path.join(prefix, 'lib%s.so' % library), '/usr/local/lib/lib%s.so' % library],
'win32': [os.path.join(prefix, 'lib%s.dll' % library), '%s.dll' % library],
'darwin': [os.path.join(prefix, 'lib%s.dylib' % library),
'/opt/local/lib/lib%s.dylib' % library,
'/usr/local/lib/lib%s.dylib' % library,
] + glob.glob(
'/usr/local/Cellar/*%s*/*/lib/lib%s.dylib' % (library, library)),
}
for i in range(2, 4):
system_paths['linux%d' % i] = system_paths['linux']
# Search the common install directories first; these are usually not in the library search path
# Search these *first*, since a) they are the most likely locations and b) there may be a
# discrepency between where ctypes.util.find_library and
# ctypes.cdll.LoadLibrary search for libs.
for path in system_paths[sys.platform]:
binwalk.core.common.debug("Searching for '%s'" % path)
if os.path.exists(path):
lib_path = path
break
# If we failed to find the library, check the standard library
# search paths
if not lib_path:
lib_path = ctypes.util.find_library(library)
# Use the first library that we can find
if lib_path:
binwalk.core.common.debug(
"Found library '%s' at: %s" % (library, lib_path))
break
else:
binwalk.core.common.debug(
"Could not find library '%s'" % library)
# If we still couldn't find the library, error out
if not lib_path:
raise Exception("Failed to locate libraries '%s'" % str(libraries))
return lib_path
......@@ -109,13 +109,14 @@ class Display(object):
self._fprint(
"MD5 Checksum: %s\n", [md5sum], csv=False, filter=False)
if self.custom_verbose_format and self.custom_verbose_args:
self._fprint(
self.custom_verbose_format, self.custom_verbose_args, csv=False, filter=False)
self._fprint(self.custom_verbose_format,
self.custom_verbose_args,
csv=False,
filter=False)
self._fprint("%s", "\n", csv=False, filter=False)
self._fprint(self.header_format, args, filter=False)
self._fprint(
"%s", ["-" * self.HEADER_WIDTH + "\n"], csv=False, filter=False)
self._fprint("%s", ["-" * self.HEADER_WIDTH + "\n"], csv=False, filter=False)
def result(self, *args):
# Convert to list for item assignment
......@@ -218,8 +219,7 @@ class Display(object):
# Add any remaining data (guarunteed to be max_line_wrap_length
# long or shorter) to self.string_parts
self._append_to_data_parts(
line, offset, offset + len(line[offset:]))
self._append_to_data_parts(line, offset, offset + len(line[offset:]))
# Append self.string_parts to formatted_line; each part seperated
# by delim
......@@ -244,8 +244,7 @@ class Display(object):
import termios
# Get the terminal window width
hw = struct.unpack(
'hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))
hw = struct.unpack('hh', fcntl.ioctl(1, termios.TIOCGWINSZ, '1234'))
self.SCREEN_WIDTH = self.HEADER_WIDTH = hw[1]
except KeyboardInterrupt as e:
raise e
......
......@@ -164,13 +164,11 @@ class SignatureLine(object):
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise ParserException(
"Failed to expand string '%s' with integer '%s' in line '%s'" % (self.value, n, line))
raise ParserException("Failed to expand string '%s' with integer '%s' in line '%s'" % (self.value, n, line))
try:
self.value = binwalk.core.compat.string_decode(self.value)
except ValueError as e:
raise ParserException(
"Failed to decode string value '%s' in line '%s'" % (self.value, line))
raise ParserException("Failed to decode string value '%s' in line '%s'" % (self.value, line))
# If a regex was specified, compile it
elif self.type == 'regex':
self.regex = True
......@@ -180,21 +178,18 @@ class SignatureLine(object):
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise ParserException(
"Invalid regular expression '%s': %s" % (self.value, str(e)))
raise ParserException("Invalid regular expression '%s': %s" % (self.value, str(e)))
# Non-string types are integer values
else:
try:
self.value = int(self.value, 0)
except ValueError as e:
raise ParserException(
"Failed to convert value '%s' to an integer on line '%s'" % (self.value, line))
raise ParserException("Failed to convert value '%s' to an integer on line '%s'" % (self.value, line))
# Sanity check to make sure the first line of a signature has an
# explicit value
if self.level == 0 and self.value is None:
raise ParserException(
"First element of a signature must specify a non-wildcard value: '%s'" % (line))
raise ParserException("First element of a signature must specify a non-wildcard value: '%s'" % (line))
# Set the size and struct format value for the specified data type.
# This must be done, obviously, after the value has been parsed out
......@@ -232,8 +227,7 @@ class SignatureLine(object):
self.fmt = 'i'
self.size = 4
else:
raise ParserException(
"Unknown data type '%s' in line '%s'" % (self.type, line))
raise ParserException("Unknown data type '%s' in line '%s'" % (self.type, line))
# The struct module uses the same characters for specifying signed and unsigned data types,
# except that signed data types are upper case. The above if-else code sets self.fmt to the
......@@ -374,8 +368,7 @@ class Signature(object):
if not binwalk.core.compat.has_key(line.tags, 'overlap'):
for i in range(1, line.size):
if restr[i:] == restr[0:(line.size - i)]:
binwalk.core.common.warning(
"Signature '%s' is a self-overlapping signature!" % line.text)
binwalk.core.common.warning("Signature '%s' is a self-overlapping signature!" % line.text)
break
return re.compile(re.escape(restr))
......@@ -492,8 +485,7 @@ class Magic(object):
s = expression[:period].rfind('(') + 1
# The offset address may be an evaluatable expression, such as '(4+0.L)', typically the result
# of the original offset being something like '(&0.L)'.
o = binwalk.core.common.MathExpression(
expression[s:period]).value
o = binwalk.core.common.MathExpression(expression[s:period]).value
t = expression[period + 1]
# Re-build just the parsed offset portion of the expression
......@@ -512,24 +504,19 @@ class Magic(object):
try:
# Big and little endian byte format
if t in ['b', 'B']:
v = struct.unpack(
'b', binwalk.core.compat.str2bytes(self.data[o:o + 1]))[0]
v = struct.unpack('b', binwalk.core.compat.str2bytes(self.data[o:o + 1]))[0]
# Little endian short format
elif t == 's':
v = struct.unpack(
'<h', binwalk.core.compat.str2bytes(self.data[o:o + 2]))[0]
v = struct.unpack('<h', binwalk.core.compat.str2bytes(self.data[o:o + 2]))[0]
# Little endian long format
elif t == 'l':
v = struct.unpack(
'<i', binwalk.core.compat.str2bytes(self.data[o:o + 4]))[0]
v = struct.unpack('<i', binwalk.core.compat.str2bytes(self.data[o:o + 4]))[0]
# Big endian short format
elif t == 'S':
v = struct.unpack(
'>h', binwalk.core.compat.str2bytes(self.data[o:o + 2]))[0]
v = struct.unpack('>h', binwalk.core.compat.str2bytes(self.data[o:o + 2]))[0]
# Bit endian long format
elif t == 'L':
v = struct.unpack(
'>i', binwalk.core.compat.str2bytes(self.data[o:o + 4]))[0]
v = struct.unpack('>i', binwalk.core.compat.str2bytes(self.data[o:o + 4]))[0]
# struct.error is thrown if there is not enough bytes in
# self.data for the specified format type
except struct.error as e:
......@@ -589,15 +576,13 @@ class Magic(object):
ple = '%d+' % previous_line_end
# Allow users to use either the '&0' (libmagic) or '&+0' (explcit addition) sytaxes;
# replace both with the ple text.
line_offset_text = line.offset.replace(
'&+', ple).replace('&', ple)
line_offset_text = line.offset.replace('&+', ple).replace('&', ple)
# Evaluate the expression
line_offset = self._do_math(offset, line_offset_text)
# Sanity check
if not isinstance(line_offset, int):
raise ParserException(
"Failed to convert offset '%s' to a number: '%s'" % (line.offset, line.text))
raise ParserException("Failed to convert offset '%s' to a number: '%s'" % (line.offset, line.text))
# The start of the data needed by this line is at offset + line_offset.
# The end of the data will be line.size bytes later.
......@@ -607,8 +592,7 @@ class Magic(object):
# If the line has a packed format string, unpack it
if line.pkfmt:
try:
dvalue = struct.unpack(
line.pkfmt, binwalk.core.compat.str2bytes(self.data[start:end]))[0]
dvalue = struct.unpack(line.pkfmt, binwalk.core.compat.str2bytes(self.data[start:end]))[0]
# Not enough bytes left in self.data for the specified
# format size
except struct.error as e:
......@@ -624,8 +608,7 @@ class Magic(object):
# Else, just terminate the string at the first newline,
# carriage return, or NULL byte
else:
dvalue = self.data[start:end].split(
'\x00')[0].split('\r')[0].split('\n')[0]
dvalue = self.data[start:end].split('\x00')[0].split('\r')[0].split('\n')[0]
# Non-wildcard strings have a known length, specified in
# the signature line
else:
......@@ -665,8 +648,13 @@ class Magic(object):
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise ParserException("Operation '" + str(dvalue) + " " + str(
line.operator) + "= " + str(line.opvalue) + "' failed: " + str(e))
raise ParserException("Operation '" +
str(dvalue) +
" " +
str(line.operator) +
"= " +
str(line.opvalue) +
"' failed: " + str(e))
# Does the data (dvalue) match the specified comparison?
if ((line.value is None) or
......@@ -896,8 +884,7 @@ class Magic(object):
# existing signature entry, something is very wrong with the
# signature file.
else:
raise ParserException(
"Invalid signature line: '%s'" % line)
raise ParserException("Invalid signature line: '%s'" % line)
# Add the final signature to the signature list
if signature:
......
......@@ -498,8 +498,7 @@ class Module(object):
if r.display:
display_args = self._build_display_args(r)
if display_args:
self.config.display.format_strings(
self.HEADER_FORMAT, self.RESULT_FORMAT)
self.config.display.format_strings(self.HEADER_FORMAT, self.RESULT_FORMAT)
self.config.display.result(*display_args)
return r
......@@ -520,14 +519,12 @@ class Module(object):
self.errors.append(e)
if e.exception:
sys.stderr.write(
"\n" + e.module + " Exception: " + str(e.exception) + "\n")
sys.stderr.write("\n" + e.module + " Exception: " + str(e.exception) + "\n")
sys.stderr.write("-" * exception_header_width + "\n")
traceback.print_exc(file=sys.stderr)
sys.stderr.write("-" * exception_header_width + "\n\n")
elif e.description:
sys.stderr.write(
"\n" + e.module + " Error: " + e.description + "\n\n")
sys.stderr.write("\n" + e.module + " Error: " + e.description + "\n\n")
def header(self):
'''
......@@ -535,17 +532,13 @@ class Module(object):
Returns None.
'''
self.config.display.format_strings(
self.HEADER_FORMAT, self.RESULT_FORMAT)
self.config.display.add_custom_header(
self.VERBOSE_FORMAT, self.VERBOSE)
self.config.display.format_strings(self.HEADER_FORMAT, self.RESULT_FORMAT)
self.config.display.add_custom_header(self.VERBOSE_FORMAT, self.VERBOSE)
if type(self.HEADER) == type([]):
self.config.display.header(
*self.HEADER, file_name=self.current_target_file_name)
self.config.display.header(*self.HEADER, file_name=self.current_target_file_name)
elif self.HEADER:
self.config.display.header(
self.HEADER, file_name=self.current_target_file_name)
self.config.display.header(self.HEADER, file_name=self.current_target_file_name)
def footer(self):
'''
......@@ -577,8 +570,7 @@ class Module(object):
self.config.verbose = self.config.display.verbose = True
if not self.config.files:
binwalk.core.common.debug(
"No target files specified, module %s terminated" % self.name)
binwalk.core.common.debug("No target files specified, module %s terminated" % self.name)
return False
self.reset_dependencies()
......@@ -592,8 +584,7 @@ class Module(object):
return False
try:
self.config.display.format_strings(
self.HEADER_FORMAT, self.RESULT_FORMAT)
self.config.display.format_strings(self.HEADER_FORMAT, self.RESULT_FORMAT)
except KeyboardInterrupt as e:
raise e
except Exception as e:
......@@ -648,8 +639,7 @@ class Modules(object):
self.arguments = []
self.executed_modules = {}
self.default_dependency_modules = {}
self.status = Status(
completed=0, total=0, fp=None, running=False, shutdown=False, finished=False)
self.status = Status(completed=0, total=0, fp=None, running=False, shutdown=False, finished=False)
self.status_server_started = False
self.status_service = None
......@@ -723,13 +713,11 @@ class Modules(object):
continue
module_name = file_name[:-3]
try:
user_module = imp.load_source(
module_name, os.path.join(user_modules, file_name))
user_module = imp.load_source(module_name, os.path.join(user_modules, file_name))
except KeyboardInterrupt as e:
raise e
except Exception as e:
binwalk.core.common.warning(
"Error loading module '%s': %s" % (file_name, str(e)))
binwalk.core.common.warning("Error loading module '%s': %s" % (file_name, str(e)))
for (name, module) in inspect.getmembers(user_module):
if inspect.isclass(module) and hasattr(module, attribute):
......@@ -772,8 +760,7 @@ class Modules(object):
short_opt = " "
fmt = " %%s %%s%%-%ds%%s\n" % (25 - len(long_opt))
help_string += fmt % (
short_opt, long_opt, optargs, module_option.description)
help_string += fmt % (short_opt, long_opt, optargs, module_option.description)
return help_string + "\n"
......@@ -855,8 +842,7 @@ class Modules(object):
if hasattr(binwalk.modules, dependency.name):
dependency.module = getattr(binwalk.modules, dependency.name)
else:
raise ModuleException(
"%s depends on %s which was not found in binwalk.modules.__init__.py\n" % (str(module), dependency.name))
raise ModuleException("%s depends on %s which was not found in binwalk.modules.__init__.py\n" % (str(module), dependency.name))
# No recursive dependencies, thanks
if dependency.module == module:
......@@ -869,14 +855,12 @@ class Modules(object):
# Modules that are not enabled (e.g., extraction module) can load any dependency as long as they don't
# set any custom kwargs for those dependencies.
if module_enabled or not dependency.kwargs:
depobj = self.run(
dependency.module, dependency=True, kwargs=dependency.kwargs)
depobj = self.run(dependency.module, dependency=True, kwargs=dependency.kwargs)
# If a dependency failed, consider this a non-recoverable error and
# raise an exception
if depobj.errors:
raise ModuleException(
"Failed to load " + dependency.name + " module")
raise ModuleException("Failed to load " + dependency.name + " module")
else:
attributes[dependency.attribute] = depobj
......@@ -958,8 +942,7 @@ class Modules(object):
last_priority[name] = module_option.priority
try:
kwargs[name] = module_option.convert(
args[module_option.long], default_value)
kwargs[name] = module_option.convert(args[module_option.long], default_value)
except KeyboardInterrupt as e:
raise e
except Exception as e:
......@@ -990,8 +973,7 @@ class Modules(object):
if not hasattr(obj, k):
setattr(obj, k, v)
else:
raise Exception(
"binwalk.core.module.Modules.process_kwargs: %s has no attribute 'KWARGS'" % str(obj))
raise Exception("binwalk.core.module.Modules.process_kwargs: %s has no attribute 'KWARGS'" % str(obj))
def status_server(self, port):
'''
......@@ -1005,11 +987,9 @@ class Modules(object):
if self.status_server_started == False:
self.status_server_started = True
try:
self.status_service = binwalk.core.statuserver.StatusServer(
port, self)
self.status_service = binwalk.core.statuserver.StatusServer(port, self)
except Exception as e:
binwalk.core.common.warning(
"Failed to start status server on port %d: %s" % (port, str(e)))
binwalk.core.common.warning("Failed to start status server on port %d: %s" % (port, str(e)))
def process_kwargs(obj, kwargs):
......
......@@ -119,8 +119,7 @@ class Plugins(object):
except IgnoreFileException as e:
raise e
except Exception as e:
binwalk.core.common.warning(
"%s.%s failed: %s" % (callback.__module__, callback.__name__, e))
binwalk.core.common.warning("%s.%s failed: %s" % (callback.__module__, callback.__name__, e))
def _find_plugin_class(self, plugin):
for (name, klass) in inspect.getmembers(plugin, inspect.isclass):
......@@ -177,8 +176,7 @@ class Plugins(object):
module = file_name[:-len(self.MODULE_EXTENSION)]
try:
plugin = imp.load_source(
module, os.path.join(plugins[key]['path'], file_name))
plugin = imp.load_source(module, os.path.join(plugins[key]['path'], file_name))
plugin_class = self._find_plugin_class(plugin)
plugins[key]['enabled'][module] = True
......@@ -186,8 +184,7 @@ class Plugins(object):
except KeyboardInterrupt as e:
raise e
except Exception as e:
binwalk.core.common.warning(
"Error loading plugin '%s': %s" % (file_name, str(e)))
binwalk.core.common.warning("Error loading plugin '%s': %s" % (file_name, str(e)))
plugins[key]['enabled'][module] = False
try:
......@@ -208,8 +205,7 @@ class Plugins(object):
def _load_plugin_modules(self, plugins):
for module in plugins['modules']:
try:
file_path = os.path.join(
plugins['path'], module + self.MODULE_EXTENSION)
file_path = os.path.join(plugins['path'], module + self.MODULE_EXTENSION)
except KeyboardInterrupt as e:
raise e
except Exception:
......@@ -231,8 +227,7 @@ class Plugins(object):
pass
try:
self.load_file.append(
getattr(class_instance, self.LOADFILE))
self.load_file.append(getattr(class_instance, self.LOADFILE))
except KeyboardInterrupt as e:
raise e
except Exception as e:
......@@ -246,8 +241,7 @@ class Plugins(object):
pass
try:
self.post_scan.append(
getattr(class_instance, self.POSTSCAN))
self.post_scan.append(getattr(class_instance, self.POSTSCAN))
except KeyboardInterrupt as e:
raise e
except Exception as e:
......@@ -263,8 +257,7 @@ class Plugins(object):
except KeyboardInterrupt as e:
raise e
except Exception as e:
binwalk.core.common.warning(
"Failed to load plugin module '%s': %s" % (module, str(e)))
binwalk.core.common.warning("Failed to load plugin module '%s': %s" % (module, str(e)))
def pre_scan_callbacks(self, obj):
return self._call_plugins(self.pre_scan)
......
......@@ -41,25 +41,16 @@ class Settings:
self.system_dir = common.get_module_path()
# Build the paths to all user-specific files
self.user = common.GenericContainer(
binarch=self._user_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(
user_only=True),
extract=self._user_path(
self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
modules=self._user_path(
self.BINWALK_MODULES_DIR),
self.user = common.GenericContainer(binarch=self._user_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(user_only=True),
extract=self._user_path(self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
modules=self._user_path(self.BINWALK_MODULES_DIR),
plugins=self._user_path(self.BINWALK_PLUGINS_DIR))
# Build the paths to all system-wide files
self.system = common.GenericContainer(
binarch=self._system_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(
system_only=True),
extract=self._system_path(
self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
self.system = common.GenericContainer(binarch=self._system_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE),
magic=self._magic_signature_files(system_only=True),
extract=self._system_path(self.BINWALK_CONFIG_DIR, self.EXTRACT_FILE),
plugins=self._system_path(self.BINWALK_PLUGINS_DIR))
def _magic_signature_files(self, system_only=False, user_only=False):
......@@ -72,18 +63,15 @@ class Settings:
Returns a list of user/system magic signature files.
'''
files = []
user_binarch = self._user_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
system_binarch = self._system_path(
self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
user_binarch = self._user_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
system_binarch = self._system_path(self.BINWALK_MAGIC_DIR, self.BINARCH_MAGIC_FILE)
def list_files(dir_path):
# Ignore hidden dotfiles.
return [os.path.join(dir_path, x) for x in os.listdir(dir_path) if not x.startswith('.')]
if not system_only:
user_dir = os.path.join(
self.user_dir, self.BINWALK_USER_DIR, self.BINWALK_MAGIC_DIR)
user_dir = os.path.join(self.user_dir, self.BINWALK_USER_DIR, self.BINWALK_MAGIC_DIR)
files += list_files(user_dir)
if not user_only:
system_dir = os.path.join(self.system_dir, self.BINWALK_MAGIC_DIR)
......
......@@ -28,22 +28,17 @@ class StatusRequestHandler(SocketServer.BaseRequestHandler):
time.sleep(0.1)
try:
self.request.send(
binwalk.core.compat.str2bytes('\b' * last_status_message_len))
self.request.send(
binwalk.core.compat.str2bytes(' ' * last_status_message_len))
self.request.send(
binwalk.core.compat.str2bytes('\b' * last_status_message_len))
self.request.send(binwalk.core.compat.str2bytes('\b' * last_status_message_len))
self.request.send(binwalk.core.compat.str2bytes(' ' * last_status_message_len))
self.request.send(binwalk.core.compat.str2bytes('\b' * last_status_message_len))
if self.server.binwalk.status.shutdown:
self.server.binwalk.status.finished = True
break
if self.server.binwalk.status.total != 0:
percentage = (
(float(self.server.binwalk.status.completed) / float(self.server.binwalk.status.total)) * 100)
status_message = message_format % (
self.server.binwalk.status.fp.path,
percentage = ((float(self.server.binwalk.status.completed) / float(self.server.binwalk.status.total)) * 100)
status_message = message_format % (self.server.binwalk.status.fp.path,
percentage,
self.server.binwalk.status.completed,
self.server.binwalk.status.total)
......@@ -53,15 +48,13 @@ class StatusRequestHandler(SocketServer.BaseRequestHandler):
continue
last_status_message_len = len(status_message)
self.request.send(
binwalk.core.compat.str2bytes(status_message))
self.request.send(binwalk.core.compat.str2bytes(status_message))
message_sent = True
except IOError as e:
if e.errno == errno.EPIPE:
break
except Exception as e:
binwalk.core.common.debug(
'StatusRequestHandler exception: ' + str(e) + '\n')
binwalk.core.common.debug('StatusRequestHandler exception: ' + str(e) + '\n')
except KeyboardInterrupt as e:
raise e
......@@ -77,8 +70,7 @@ class ThreadedStatusServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
class StatusServer(object):
def __init__(self, port, binwalk):
self.server = ThreadedStatusServer(
('127.0.0.1', port), StatusRequestHandler)
self.server = ThreadedStatusServer(('127.0.0.1', port), StatusRequestHandler)
self.server.binwalk = binwalk
t = threading.Thread(target=self.server.serve_forever)
......
......@@ -15,8 +15,3 @@ from binwalk.modules.hexdiff import HexDiff
from binwalk.modules.general import General
from binwalk.modules.extractor import Extractor
from binwalk.modules.entropy import Entropy
# These are depreciated.
# from binwalk.modules.binvis import Plotter
# from binwalk.modules.hashmatch import HashMatch
# from binwalk.modules.heuristics import HeuristicCompressionAnalyzer
# Generates 3D visualizations of input files.
import os
from binwalk.core.compat import *
from binwalk.core.common import BlockFile
from binwalk.core.module import Module, Option, Kwarg
class Plotter(Module):
'''
Base class for visualizing binaries in Qt.
Other plotter classes are derived from this.
'''
VIEW_DISTANCE = 1024
MAX_2D_PLOT_POINTS = 12500
MAX_3D_PLOT_POINTS = 25000
TITLE = "Binary Visualization"
CLI = [
Option(short='3',
long='3D',
kwargs={'axis': 3, 'enabled': True},
description='Generate a 3D binary visualization'),
Option(short='2',
long='2D',
kwargs={'axis': 2, 'enabled': True},
description='Project data points onto 3D cube walls only'),
Option(short='V',
long='points',
type=int,
kwargs={'max_points': 0},
description='Set the maximum number of plotted data points'),
# Option(short='V',
# long='grids',
# kwargs={'show_grids' : True},
# description='Display the x-y-z grids in the resulting plot'),
]
KWARGS = [
Kwarg(name='axis', default=3),
Kwarg(name='max_points', default=0),
Kwarg(name='show_grids', default=False),
Kwarg(name='enabled', default=False),
]
# There isn't really any useful data to print to console. Disable header
# and result output.
HEADER = None
RESULT = None
def init(self):
import pyqtgraph.opengl as gl
from pyqtgraph.Qt import QtGui
self.verbose = self.config.verbose
self.offset = self.config.offset
self.length = self.config.length
self.plane_count = -1
self.plot_points = None
if self.axis == 2:
self.MAX_PLOT_POINTS = self.MAX_2D_PLOT_POINTS
self._generate_data_point = self._generate_2d_data_point
elif self.axis == 3:
self.MAX_PLOT_POINTS = self.MAX_3D_PLOT_POINTS
self._generate_data_point = self._generate_3d_data_point
else:
raise Exception(
"Invalid Plotter axis specified: %d. Must be one of: [2,3]" % self.axis)
if not self.max_points:
self.max_points = self.MAX_PLOT_POINTS
self.app = QtGui.QApplication([])
self.window = gl.GLViewWidget()
self.window.opts['distance'] = self.VIEW_DISTANCE
if len(self.config.target_files) == 1:
self.window.setWindowTitle(self.config.target_files[0])
def _print(self, message):
'''
Print console messages. For internal use only.
'''
if self.verbose:
print(message)
def _generate_plot_points(self, data_points):
'''
Generates plot points from a list of data points.
@data_points - A dictionary containing each unique point and its frequency of occurance.
Returns a set of plot points.
'''
total = 0
min_weight = 0
weightings = {}
plot_points = {}
# If the number of data points exceeds the maximum number of allowed data points, use a
# weighting system to eliminate data points that occur less freqently.
if sum(data_points.values()) > self.max_points:
# First, generate a set of weight values 1 - 10
for i in range(1, 11):
weightings[i] = 0
# Go through every data point and how many times that point occurs
for (point, count) in iterator(data_points):
# For each data point, compare it to each remaining weight
# value
for w in get_keys(weightings):
# If the number of times this data point occurred is >= the weight value,
# then increment the weight value. Since weight values are ordered lowest
# to highest, this means that more frequent data points also increment lower
# weight values. Thus, the more high-frequency data points there are, the
# more lower-frequency data points are eliminated.
if count >= w:
weightings[w] += 1
else:
break
# Throw out weight values that exceed the maximum number of
# data points
if weightings[w] > self.max_points:
del weightings[w]
# If there's only one weight value left, no sense in continuing
# the loop...
if len(weightings) == 1:
break
# The least weighted value is our minimum weight
min_weight = min(weightings)
# Get rid of all data points that occur less frequently than our
# minimum weight
for point in get_keys(data_points):
if data_points[point] < min_weight:
del data_points[point]
for point in sorted(data_points, key=data_points.get, reverse=True):
plot_points[point] = data_points[point]
# Register this as a result in case future modules need access to the raw point information,
# but mark plot as False to prevent the entropy module from
# attempting to overlay this data on its graph.
self.result(point=point, plot=False)
total += 1
if total >= self.max_points:
break
return plot_points
def _generate_data_point(self, data):
'''
Subclasses must override this to return the appropriate data point.
@data - A string of data self.axis in length.
Returns a data point tuple.
'''
return (0, 0, 0)
def _generate_data_points(self, fp):
'''
Generates a dictionary of data points and their frequency of occurrance.
@fp - The BlockFile object to generate data points from.
Returns a dictionary.
'''
i = 0
data_points = {}
self._print("Generating data points for %s" % fp.name)
# We don't need any extra data from BlockFile
fp.set_block_size(peek=0)
while True:
(data, dlen) = fp.read_block()
if not data or not dlen:
break
i = 0
while (i + (self.axis - 1)) < dlen:
point = self._generate_data_point(data[i:i + self.axis])
if has_key(data_points, point):
data_points[point] += 1
else:
data_points[point] = 1
i += 3
return data_points
def _generate_plot(self, plot_points):
import numpy as np
import pyqtgraph.opengl as gl
nitems = float(len(plot_points))
pos = np.empty((nitems, 3))
size = np.empty((nitems))
color = np.empty((nitems, 4))
i = 0
for (point, weight) in iterator(plot_points):
r = 0.0
g = 0.0
b = 0.0
pos[i] = point
frequency_percentage = (weight / nitems)
# Give points that occur more frequently a brighter color and larger point size.
# Frequency is determined as a percentage of total unique data
# points.
if frequency_percentage > .010:
size[i] = .20
r = 1.0
elif frequency_percentage > .005:
size[i] = .15
b = 1.0
elif frequency_percentage > .002:
size[i] = .10
g = 1.0
r = 1.0
else:
size[i] = .05
g = 1.0
color[i] = (r, g, b, 1.0)
i += 1
scatter_plot = gl.GLScatterPlotItem(
pos=pos, size=size, color=color, pxMode=False)
scatter_plot.translate(-127.5, -127.5, -127.5)
return scatter_plot
def plot(self, wait=True):
import pyqtgraph.opengl as gl
self.window.show()
if self.show_grids:
xgrid = gl.GLGridItem()
ygrid = gl.GLGridItem()
zgrid = gl.GLGridItem()
self.window.addItem(xgrid)
self.window.addItem(ygrid)
self.window.addItem(zgrid)
# Rotate x and y grids to face the correct direction
xgrid.rotate(90, 0, 1, 0)
ygrid.rotate(90, 1, 0, 0)
# Scale grids to the appropriate dimensions
xgrid.scale(12.8, 12.8, 12.8)
ygrid.scale(12.8, 12.8, 12.8)
zgrid.scale(12.8, 12.8, 12.8)
for fd in iter(self.next_file, None):
data_points = self._generate_data_points(fd)
self._print("Generating plot points from %d data points" %
len(data_points))
self.plot_points = self._generate_plot_points(data_points)
del data_points
self._print("Generating graph from %d plot points" %
len(self.plot_points))
self.window.addItem(self._generate_plot(self.plot_points))
if wait:
self.wait()
def wait(self):
from pyqtgraph.Qt import QtCore, QtGui
t = QtCore.QTimer()
t.start(50)
QtGui.QApplication.instance().exec_()
def _generate_3d_data_point(self, data):
'''
Plot data points within a 3D cube.
'''
return (ord(data[0]), ord(data[1]), ord(data[2]))
def _generate_2d_data_point(self, data):
'''
Plot data points projected on each cube face.
'''
self.plane_count += 1
if self.plane_count > 5:
self.plane_count = 0
if self.plane_count == 0:
return (0, ord(data[0]), ord(data[1]))
elif self.plane_count == 1:
return (ord(data[0]), 0, ord(data[1]))
elif self.plane_count == 2:
return (ord(data[0]), ord(data[1]), 0)
elif self.plane_count == 3:
return (255, ord(data[0]), ord(data[1]))
elif self.plane_count == 4:
return (ord(data[0]), 255, ord(data[1]))
elif self.plane_count == 5:
return (ord(data[0]), ord(data[1]), 255)
def run(self):
self.plot()
return True
......@@ -37,8 +37,7 @@ class LZMA(object):
# Add an extraction rule
if self.module.extractor.enabled:
self.module.extractor.add_rule(
regex='^%s' % self.DESCRIPTION.lower(), extension="7z", cmd=self.extractor)
self.module.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="7z", cmd=self.extractor)
def extractor(self, file_name):
# Open and read the file containing the raw compressed data.
......@@ -53,8 +52,7 @@ class LZMA(object):
# and a fake output file size field.
header = chr(self.properties) + \
self.dictionaries[-1] + ("\xFF" * 8)
binwalk.core.common.BlockFile(
file_name, "wb").write(header + compressed_data)
binwalk.core.common.BlockFile(file_name, "wb").write(header + compressed_data)
# Try to extract it with all the normal lzma extractors until one
# works
......@@ -83,8 +81,7 @@ class LZMA(object):
def parse_header(self, header):
(pb, lp, lc) = self.parse_property(header[0])
dictionary = struct.unpack(
"<I", binwalk.core.compat.str2bytes(header[1:5]))[0]
dictionary = struct.unpack("<I", binwalk.core.compat.str2bytes(header[1:5]))[0]
return LZMAHeader(pb=pb, lp=lp, lc=lc, dictionary=dictionary)
def build_properties(self):
......@@ -107,12 +104,10 @@ class LZMA(object):
if self.module.partial_scan == True:
# For partial scans, only use the largest dictionary value
self.dictionaries.append(
binwalk.core.compat.bytes2str(struct.pack("<I", 2 ** 25)))
self.dictionaries.append(binwalk.core.compat.bytes2str(struct.pack("<I", 2 ** 25)))
else:
for n in range(16, 26):
self.dictionaries.append(
binwalk.core.compat.bytes2str(struct.pack("<I", 2 ** n)))
self.dictionaries.append(binwalk.core.compat.bytes2str(struct.pack("<I", 2 ** n)))
def build_headers(self):
self.headers = set()
......@@ -148,8 +143,7 @@ class LZMA(object):
break
if result is not None:
self.properties = self.build_property(
result.pb, result.lp, result.lc)
self.properties = self.build_property(result.pb, result.lp, result.lc)
description = "%s, properties: 0x%.2X [pb: %d, lp: %d, lc: %d], dictionary size: %d" % (self.DESCRIPTION,
self.properties,
result.pb,
......@@ -175,8 +169,7 @@ class Deflate(object):
# Add an extraction rule
if self.module.extractor.enabled:
self.module.extractor.add_rule(
regex='^%s' % self.DESCRIPTION.lower(), extension="deflate", cmd=self.extractor)
self.module.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="deflate", cmd=self.extractor)
def extractor(self, file_name):
in_data = ""
......@@ -193,8 +186,7 @@ class Deflate(object):
in_data += data[:dlen]
try:
out_data = zlib.decompress(
binwalk.core.compat.str2bytes(in_data), -15)
out_data = zlib.decompress(binwalk.core.compat.str2bytes(in_data), -15)
with binwalk.core.common.BlockFile(out_file, 'w') as fp_out:
fp_out.write(out_data)
retval = True
......@@ -275,11 +267,9 @@ class RawCompression(Module):
for i in range(0, dlen):
for decompressor in self.decompressors:
description = decompressor.decompress(
data[i:i + decompressor.BLOCK_SIZE])
description = decompressor.decompress(data[i:i + decompressor.BLOCK_SIZE])
if description:
self.result(
description=description, file=fp, offset=fp.tell() - dlen + i)
self.result(description=description, file=fp, offset=fp.tell() - dlen + i)
if self.stop_on_first_hit:
file_done = True
break
......
......@@ -99,8 +99,7 @@ class Disasm(Module):
self.disasm_data_size = self.min_insn_count * 10
for arch in self.ARCHITECTURES:
self.disassemblers.append(
(capstone.Cs(arch.type, (arch.mode + arch.endianess)), arch.description))
self.disassemblers.append((capstone.Cs(arch.type, (arch.mode + arch.endianess)), arch.description))
def scan_file(self, fp):
total_read = 0
......@@ -124,18 +123,15 @@ class Disasm(Module):
# Don't pass the entire data block into disasm_lite, it's horribly inefficient
# to pass large strings around in Python. Break it up into
# smaller code blocks instead.
code_block = binwalk.core.compat.str2bytes(
data[block_offset:block_offset + self.disasm_data_size])
code_block = binwalk.core.compat.str2bytes(data[block_offset:block_offset + self.disasm_data_size])
# If this code block doesn't contain at least two different bytes, skip it
# to prevent false positives (e.g., "\x00\x00\x00\x00" is a
# nop in MIPS).
if len(set(code_block)) >= 2:
for (md, description) in self.disassemblers:
insns = [insn for insn in md.disasm_lite(
code_block, (total_read + block_offset))]
binwalk.core.common.debug(
"0x%.8X %s, at least %d valid instructions" % ((total_read + block_offset),
insns = [insn for insn in md.disasm_lite(code_block, (total_read + block_offset))]
binwalk.core.common.debug("0x%.8X %s, at least %d valid instructions" % ((total_read + block_offset),
description,
len(insns)))
......@@ -150,8 +146,7 @@ class Disasm(Module):
if result.count >= self.THRESHOLD:
break
else:
result = ArchResult(
offset=total_read +
result = ArchResult(offset=total_read +
block_offset + fp.offset,
description=description,
insns=insns,
......@@ -168,8 +163,7 @@ class Disasm(Module):
if r.valid and r.display:
if self.config.verbose:
for (position, size, mnem, opnds) in result.insns:
self.result(
offset=position, file=fp, description="%s %s" % (mnem, opnds))
self.result(offset=position, file=fp, description="%s %s" % (mnem, opnds))
if not self.keep_going:
return
......
......@@ -102,8 +102,7 @@ class Entropy(Module):
if len(description) > self.max_description_length:
self.max_description_length = len(description)
self.file_markers[result.file.name].append(
(result.offset, description))
self.file_markers[result.file.name].append((result.offset, description))
# If other modules have been run and they produced results, don't spam
# the terminal with entropy results
......@@ -157,8 +156,7 @@ class Entropy(Module):
if self.block_size is None:
block_size = fp.size / self.DEFAULT_DATA_POINTS
# Round up to the nearest DEFAULT_BLOCK_SIZE (1024)
block_size = int(
block_size + ((self.DEFAULT_BLOCK_SIZE - block_size) % self.DEFAULT_BLOCK_SIZE))
block_size = int(block_size + ((self.DEFAULT_BLOCK_SIZE - block_size) % self.DEFAULT_BLOCK_SIZE))
else:
block_size = self.block_size
......@@ -240,8 +238,7 @@ class Entropy(Module):
'''
# Entropy is a simple ratio of: <zlib compressed size> / <original
# size>
e = float(
float(len(zlib.compress(str2bytes(data), 9))) / float(len(data)))
e = float(float(len(zlib.compress(str2bytes(data), 9))) / float(len(data)))
if truncate and e > 1.0:
e = 1.0
......
......@@ -165,11 +165,9 @@ class Extractor(Module):
fp.close()
self.pending.append(f)
except IOError as e:
binwalk.core.common.warning(
"Ignoring file '%s': %s" % (f, str(e)))
binwalk.core.common.warning("Ignoring file '%s': %s" % (f, str(e)))
else:
binwalk.core.common.warning(
"Ignoring file '%s': Not a regular file" % f)
binwalk.core.common.warning("Ignoring file '%s': Not a regular file" % f)
def reset(self):
# Holds a list of pending files that should be scanned; only populated
......@@ -441,8 +439,7 @@ class Extractor(Module):
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception(
"Extractor.load_from_file failed to load file '%s': %s" % (fname, str(e)))
raise Exception("Extractor.load_from_file failed to load file '%s': %s" % (fname, str(e)))
def load_defaults(self):
'''
......@@ -464,8 +461,7 @@ class Extractor(Module):
raise e
except Exception as e:
if binwalk.core.common.DEBUG:
raise Exception(
"Extractor.load_defaults failed to load file '%s': %s" % (extract_file, str(e)))
raise Exception("Extractor.load_defaults failed to load file '%s': %s" % (extract_file, str(e)))
def get_output_directory_override(self):
'''
......@@ -516,19 +512,16 @@ class Extractor(Module):
subdir = ""
if self.output_directory_override:
output_directory = os.path.join(
self.directory, subdir, self.output_directory_override)
output_directory = os.path.join(self.directory, subdir, self.output_directory_override)
else:
outdir = os.path.join(self.directory, subdir, '_' + basename)
output_directory = unique_file_name(
outdir, extension='extracted')
output_directory = unique_file_name(outdir, extension='extracted')
if not os.path.exists(output_directory):
os.mkdir(output_directory)
self.extraction_directories[path] = output_directory
self.output[path].directory = os.path.realpath(
output_directory) + os.path.sep
self.output[path].directory = os.path.realpath(output_directory) + os.path.sep
# Else, just use the already created directory
else:
output_directory = self.extraction_directories[path]
......@@ -719,8 +712,7 @@ class Extractor(Module):
try:
codes[i] = int(codes[i], 0)
except ValueError as e:
binwalk.core.common.warning(
"The specified return code '%s' for extractor '%s' is not a valid number!" % (codes[i], values[0]))
binwalk.core.common.warning("The specified return code '%s' for extractor '%s' is not a valid number!" % (codes[i], values[0]))
values[3] = codes
if len(values) >= 5:
......@@ -827,8 +819,7 @@ class Extractor(Module):
except KeyboardInterrupt as e:
raise e
except Exception as e:
binwalk.core.common.warning(
"Internal extractor '%s' failed with exception: '%s'" % (str(cmd), str(e)))
binwalk.core.common.warning("Internal extractor '%s' failed with exception: '%s'" % (str(cmd), str(e)))
elif cmd:
# If not in debug mode, create a temporary file to redirect
# stdout and stderr to
......@@ -840,31 +831,25 @@ class Extractor(Module):
while self.UNIQUE_PATH_DELIMITER in cmd:
need_unique_path = cmd.split(self.UNIQUE_PATH_DELIMITER)[
1].split(self.UNIQUE_PATH_DELIMITER)[0]
unique_path = binwalk.core.common.unique_file_name(
need_unique_path)
cmd = cmd.replace(
self.UNIQUE_PATH_DELIMITER + need_unique_path + self.UNIQUE_PATH_DELIMITER, unique_path)
unique_path = binwalk.core.common.unique_file_name(need_unique_path)
cmd = cmd.replace(self.UNIQUE_PATH_DELIMITER + need_unique_path + self.UNIQUE_PATH_DELIMITER, unique_path)
# Execute.
for command in cmd.split("&&"):
# Replace all instances of FILE_NAME_PLACEHOLDER in the
# command with fname
command = command.strip().replace(
self.FILE_NAME_PLACEHOLDER, fname)
command = command.strip().replace(self.FILE_NAME_PLACEHOLDER, fname)
binwalk.core.common.debug(
"subprocess.call(%s, stdout=%s, stderr=%s)" % (command, str(tmp), str(tmp)))
rval = subprocess.call(
shlex.split(command), stdout=tmp, stderr=tmp)
binwalk.core.common.debug("subprocess.call(%s, stdout=%s, stderr=%s)" % (command, str(tmp), str(tmp)))
rval = subprocess.call(shlex.split(command), stdout=tmp, stderr=tmp)
if rval in codes:
retval = True
else:
retval = False
binwalk.core.common.debug(
'External extractor command "%s" completed with return code %d (success: %s)' % (cmd, rval, str(retval)))
binwalk.core.common.debug('External extractor command "%s" completed with return code %d (success: %s)' % (cmd, rval, str(retval)))
# TODO: Should errors from all commands in a command string be checked? Currently we only support
# specifying one set of error codes, so at the moment, this is not done; it is up to the
......@@ -876,8 +861,7 @@ class Extractor(Module):
except KeyboardInterrupt as e:
raise e
except Exception as e:
binwalk.core.common.warning(
"Extractor.execute failed to run external extractor '%s': %s, '%s' might not be installed correctly" % (str(cmd), str(e), str(cmd)))
binwalk.core.common.warning("Extractor.execute failed to run external extractor '%s': %s, '%s' might not be installed correctly" % (str(cmd), str(e), str(cmd)))
retval = None
if tmp is not None:
......
......@@ -134,11 +134,9 @@ class General(Module):
# Build file name filter regex rules
if self.file_name_include_regex:
self.file_name_include_regex = re.compile(
self.file_name_include_regex)
self.file_name_include_regex = re.compile(self.file_name_include_regex)
if self.file_name_exclude_regex:
self.file_name_exclude_regex = re.compile(
self.file_name_exclude_regex)
self.file_name_exclude_regex = re.compile(self.file_name_exclude_regex)
self.settings = binwalk.core.settings.Settings()
self.display = binwalk.core.display.Display(log=self.log_file,
......
# Performs fuzzy hashing against files/directories.
# Unlike other scans, this doesn't produce any file offsets, so its results are not applicable to
# some other scans, such as the entropy scan.
# Additionally, this module currently doesn't support certian general options (length, offset, swap, etc),
# as the libfuzzy C library is responsible for opening and scanning the
# specified files.
import os
import re
import ctypes
import fnmatch
import binwalk.core.C
import binwalk.core.common
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg
class HashResult(object):
'''
Class for storing libfuzzy hash results.
For internal use only.
'''
def __init__(self, name, hash=None, strings=None):
self.name = name
self.hash = hash
self.strings = strings
class HashMatch(Module):
'''
Class for fuzzy hash matching of files and directories.
'''
DEFAULT_CUTOFF = 0
CONSERVATIVE_CUTOFF = 90
TITLE = "Fuzzy Hash"
CLI = [
Option(short='F',
long='fuzzy',
kwargs={'enabled': True},
description='Perform fuzzy hash matching on files/directories'),
Option(short='u',
long='cutoff',
priority=100,
type=int,
kwargs={'cutoff': DEFAULT_CUTOFF},
description='Set the cutoff percentage'),
Option(short='S',
long='strings',
kwargs={'strings': True},
description='Diff strings inside files instead of the entire file'),
Option(short='s',
long='same',
kwargs={'same': True, 'cutoff': CONSERVATIVE_CUTOFF},
description='Only show files that are the same'),
Option(short='p',
long='diff',
kwargs={'same': False, 'cutoff': CONSERVATIVE_CUTOFF},
description='Only show files that are different'),
Option(short='n',
long='name',
kwargs={'filter_by_name': True},
description='Only compare files whose base names are the same'),
Option(short='L',
long='symlinks',
kwargs={'symlinks': True},
description="Don't ignore symlinks"),
]
KWARGS = [
Kwarg(name='cutoff', default=DEFAULT_CUTOFF),
Kwarg(name='strings', default=False),
Kwarg(name='same', default=True),
Kwarg(name='symlinks', default=False),
Kwarg(name='max_results', default=None),
Kwarg(name='abspath', default=False),
Kwarg(name='filter_by_name', default=False),
Kwarg(name='symlinks', default=False),
Kwarg(name='enabled', default=False),
]
LIBRARY_NAME = "fuzzy"
LIBRARY_FUNCTIONS = [
binwalk.core.C.Function(name="fuzzy_hash_buf", type=int),
binwalk.core.C.Function(name="fuzzy_hash_filename", type=int),
binwalk.core.C.Function(name="fuzzy_compare", type=int),
]
# Max result is 148 (http://ssdeep.sourceforge.net/api/html/fuzzy_8h.html)
FUZZY_MAX_RESULT = 150
# Files smaller than this won't produce meaningful fuzzy results (from
# ssdeep.h)
FUZZY_MIN_FILE_SIZE = 4096
HEADER_FORMAT = "\n%s" + " " * 11 + "%s\n"
RESULT_FORMAT = "%d%%" + " " * 17 + "%s\n"
HEADER = ["SIMILARITY", "FILE NAME"]
RESULT = ["percentage", "description"]
def init(self):
self.total = 0
self.last_file1 = HashResult(None)
self.last_file2 = HashResult(None)
self.lib = binwalk.core.C.Library(
self.LIBRARY_NAME, self.LIBRARY_FUNCTIONS)
def _get_strings(self, fname):
return ''.join(list(binwalk.core.common.strings(fname, minimum=10)))
def _show_result(self, match, fname):
if self.abspath:
fname = os.path.abspath(fname)
# Add description string padding for alignment
if match < 100:
fname = ' ' + fname
if match < 10:
fname = ' ' + fname
self.result(percentage=match, description=fname, plot=False)
def _compare_files(self, file1, file2):
'''
Fuzzy diff two files.
@file1 - The first file to diff.
@file2 - The second file to diff.
Returns the match percentage.
Returns None on error.
'''
status = 0
file1_dup = False
file2_dup = False
if not self.filter_by_name or os.path.basename(file1) == os.path.basename(file2):
if os.path.exists(file1) and os.path.exists(file2):
hash1 = ctypes.create_string_buffer(self.FUZZY_MAX_RESULT)
hash2 = ctypes.create_string_buffer(self.FUZZY_MAX_RESULT)
# Check if the last file1 or file2 matches this file1 or file2;
# no need to re-hash if they match.
if file1 == self.last_file1.name and self.last_file1.hash:
file1_dup = True
else:
self.last_file1.name = file1
if file2 == self.last_file2.name and self.last_file2.hash:
file2_dup = True
else:
self.last_file2.name = file2
try:
if self.strings:
if file1_dup:
file1_strings = self.last_file1.strings
else:
self.last_file1.strings = file1_strings = self._get_strings(
file1)
if file2_dup:
file2_strings = self.last_file2.strings
else:
self.last_file2.strings = file2_strings = self._get_strings(
file2)
if file1_strings == file2_strings:
return 100
else:
if file1_dup:
hash1 = self.last_file1.hash
else:
status |= self.lib.fuzzy_hash_buf(
file1_strings, len(file1_strings), hash1)
if file2_dup:
hash2 = self.last_file2.hash
else:
status |= self.lib.fuzzy_hash_buf(
file2_strings, len(file2_strings), hash2)
else:
if file1_dup:
hash1 = self.last_file1.hash
else:
status |= self.lib.fuzzy_hash_filename(
file1, hash1)
if file2_dup:
hash2 = self.last_file2.hash
else:
status |= self.lib.fuzzy_hash_filename(
file2, hash2)
if status == 0:
if not file1_dup:
self.last_file1.hash = hash1
if not file2_dup:
self.last_file2.hash = hash2
if hash1.raw == hash2.raw:
return 100
else:
return self.lib.fuzzy_compare(hash1, hash2)
except Exception as e:
binwalk.core.common.warning(
"Exception while doing fuzzy hash: %s" % str(e))
return None
def is_match(self, match):
'''
Returns True if this is a good match.
Returns False if his is not a good match.
'''
return (match is not None and ((match >= self.cutoff and self.same) or (match < self.cutoff and not self.same)))
def _get_file_list(self, directory):
'''
Generates a directory tree.
@directory - The root directory to start from.
Returns a set of file paths, excluding the root directory.
'''
file_list = []
# Normalize directory path so that we can exclude it from each
# individual file path
directory = os.path.abspath(directory) + os.path.sep
for (root, dirs, files) in os.walk(directory):
# Don't include the root directory in the file paths
root = ''.join(root.split(directory, 1)[1:])
# Get a list of files, with or without symlinks as specified during
# __init__
files = [os.path.join(root, f)
for f in files if self.symlinks or not os.path.islink(f)]
file_list += files
return set(file_list)
def hash_files(self, needle, haystack):
'''
Compare one file against a list of other files.
Returns a list of tuple results.
'''
self.total = 0
for f in haystack:
m = self._compare_files(needle, f)
if m is not None and self.is_match(m):
self._show_result(m, f)
self.total += 1
if self.max_results and self.total >= self.max_results:
break
def hash_file(self, needle, haystack):
'''
Search for one file inside one or more directories.
Returns a list of tuple results.
'''
matching_files = []
self.total = 0
done = False
for directory in haystack:
for f in self._get_file_list(directory):
f = os.path.join(directory, f)
m = self._compare_files(needle, f)
if m is not None and self.is_match(m):
self._show_result(m, f)
matching_files.append((m, f))
self.total += 1
if self.max_results and self.total >= self.max_results:
done = True
break
if done:
break
return matching_files
def hash_directories(self, needle, haystack):
'''
Compare the contents of one directory with the contents of other directories.
Returns a list of tuple results.
'''
done = False
self.total = 0
source_files = self._get_file_list(needle)
for directory in haystack:
dir_files = self._get_file_list(directory)
for source_file in source_files:
for dir_file in dir_files:
file1 = os.path.join(needle, source_file)
file2 = os.path.join(directory, dir_file)
m = self._compare_files(file1, file2)
if m is not None and self.is_match(m):
self._show_result(m, "%s => %s" % (file1, file2))
self.total += 1
if self.max_results and self.total >= self.max_results:
done = True
break
if done:
break
def run(self):
'''
Main module method.
'''
# Access the raw self.config.files list directly here, since we accept both
# files and directories and self.next_file only works for files.
needle = self.config.files[0]
haystack = self.config.files[1:]
self.header()
if os.path.isfile(needle):
if os.path.isfile(haystack[0]):
self.hash_files(needle, haystack)
else:
self.hash_file(needle, haystack)
else:
self.hash_directories(needle, haystack)
self.footer()
return True
# Routines to perform Chi Squared tests.
# Used for fingerprinting unknown areas of high entropy (e.g., is this block of high entropy data compressed or encrypted?).
# Inspired by people who actually know what they're doing:
# http://www.fourmilab.ch/random/
import math
from binwalk.core.compat import *
from binwalk.core.module import Module, Kwarg, Option, Dependency
class ChiSquare(object):
'''
Performs a Chi Squared test against the provided data.
'''
IDEAL = 256.0
def __init__(self):
'''
Class constructor.
Returns None.
'''
self.bytes = {}
self.freedom = self.IDEAL - 1
# Initialize the self.bytes dictionary with keys for all possible byte
# values (0 - 255)
for i in range(0, int(self.IDEAL)):
self.bytes[chr(i)] = 0
self.reset()
def reset(self):
self.xc2 = 0.0
self.byte_count = 0
for key in self.bytes.keys():
self.bytes[key] = 0
def update(self, data):
'''
Updates the current byte counts with new data.
@data - String of bytes to update.
Returns None.
'''
# Count the number of occurances of each byte value
for i in data:
self.bytes[i] += 1
self.byte_count += len(data)
def chisq(self):
'''
Calculate the Chi Square critical value.
Returns the critical value.
'''
expected = self.byte_count / self.IDEAL
if expected:
for byte in self.bytes.values():
self.xc2 += ((byte - expected) ** 2) / expected
return self.xc2
class EntropyBlock(object):
def __init__(self, **kwargs):
self.start = None
self.end = None
self.length = None
for (k, v) in iterator(kwargs):
setattr(self, k, v)
class HeuristicCompressionAnalyzer(Module):
'''
Performs analysis and attempts to interpret the results.
'''
BLOCK_SIZE = 32
CHI_CUTOFF = 512
ENTROPY_TRIGGER = .90
MIN_BLOCK_SIZE = 4096
BLOCK_OFFSET = 1024
ENTROPY_BLOCK_SIZE = 1024
TITLE = "Heuristic Compression"
DEPENDS = [
Dependency(name='Entropy',
attribute='entropy',
kwargs={
'enabled': True, 'do_plot': False, 'display_results': False, 'block_size': ENTROPY_BLOCK_SIZE}),
]
CLI = [
Option(short='H',
long='heuristic',
kwargs={'enabled': True},
description='Heuristically classify high entropy data'),
Option(short='a',
long='trigger',
kwargs={'trigger_level': 0},
type=float,
description='Set the entropy trigger level (0.0 - 1.0, default: %.2f)' % ENTROPY_TRIGGER),
]
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='trigger_level', default=ENTROPY_TRIGGER),
]
def init(self):
self.blocks = {}
self.HEADER[-1] = "HEURISTIC ENTROPY ANALYSIS"
# Trigger level sanity check
if self.trigger_level > 1.0:
self.trigger_level = 1.0
elif self.trigger_level < 0.0:
self.trigger_level = 0.0
if self.config.block:
self.block_size = self.config.block
else:
self.block_size = self.BLOCK_SIZE
for result in self.entropy.results:
if not has_key(self.blocks, result.file.name):
self.blocks[result.file.name] = []
if result.entropy >= self.trigger_level and (not self.blocks[result.file.name] or self.blocks[result.file.name][-1].end is not None):
self.blocks[result.file.name].append(
EntropyBlock(start=result.offset + self.BLOCK_OFFSET))
elif result.entropy < self.trigger_level and self.blocks[result.file.name] and self.blocks[result.file.name][-1].end is None:
self.blocks[result.file.name][
-1].end = result.offset - self.BLOCK_OFFSET
def run(self):
for fp in iter(self.next_file, None):
if has_key(self.blocks, fp.name):
self.header()
for block in self.blocks[fp.name]:
if block.end is None:
block.length = fp.offset + fp.length - block.start
else:
block.length = block.end - block.start
if block.length >= self.MIN_BLOCK_SIZE:
self.analyze(fp, block)
self.footer()
def analyze(self, fp, block):
'''
Perform analysis and interpretation.
'''
i = 0
num_error = 0
analyzer_results = []
chi = ChiSquare()
fp.seek(block.start)
while i < block.length:
j = 0
(d, dlen) = fp.read_block()
if dlen < 1:
break
while j < dlen:
chi.reset()
data = d[j:j + self.block_size]
if len(data) < self.block_size:
break
chi.update(data)
if chi.chisq() >= self.CHI_CUTOFF:
num_error += 1
j += self.block_size
if (j + i) > block.length:
break
i += dlen
if num_error > 0:
verdict = 'Moderate entropy data, best guess: compressed'
else:
verdict = 'High entropy data, best guess: encrypted'
desc = '%s, size: %d, %d low entropy blocks' % (
verdict, block.length, num_error)
self.result(offset=block.start, description=desc, file=fp)
......@@ -149,8 +149,7 @@ class HexDiff(Module):
hexbyte = "XX"
asciibyte = "."
else:
(hexbyte, asciibyte) = self.hexascii(
block_data, block_data[fp][i], i)
(hexbyte, asciibyte) = self.hexascii(block_data, block_data[fp][i], i)
hexline += "%s " % hexbyte
asciiline += "%s" % asciibyte
......
......@@ -15,8 +15,7 @@ class ArcadyanDeobfuscator(binwalk.core.plugin.Plugin):
BLOCK_SIZE = 32
BLOCK1_OFFSET = 4
BLOCK2_OFFSET = 0x68
MIN_FILE_SIZE = (
OBFUSCATION_MAGIC_SIZE + BLOCK2_OFFSET + BLOCK_SIZE)
MIN_FILE_SIZE = (OBFUSCATION_MAGIC_SIZE + BLOCK2_OFFSET + BLOCK_SIZE)
BLOCK1_START = BLOCK1_OFFSET
BLOCK1_END = BLOCK1_START + BLOCK_SIZE
......@@ -34,8 +33,7 @@ class ArcadyanDeobfuscator(binwalk.core.plugin.Plugin):
def init(self):
if self.module.extractor.enabled:
self.module.extractor.add_rule(
regex="^obfuscated arcadyan firmware",
self.module.extractor.add_rule(regex="^obfuscated arcadyan firmware",
extension="obfuscated",
cmd=self.extractor)
......@@ -59,8 +57,7 @@ class ArcadyanDeobfuscator(binwalk.core.plugin.Plugin):
# Nibble-swap each byte in block 1
nswap = ''
for i in range(self.BLOCK1_START, self.BLOCK1_END):
nswap += chr(((ord(deobfuscated[i]) & 0x0F) << 4) + (
(ord(deobfuscated[i]) & 0xF0) >> 4))
nswap += chr(((ord(deobfuscated[i]) & 0x0F) << 4) + ((ord(deobfuscated[i]) & 0xF0) >> 4))
deobfuscated = deobfuscated[
self.P1_START:self.P1_END] + nswap + deobfuscated[self.BLOCK1_END:]
......@@ -74,8 +71,7 @@ class ArcadyanDeobfuscator(binwalk.core.plugin.Plugin):
self.P1_START:self.P1_END] + bswap + deobfuscated[self.BLOCK1_END:]
if deobfuscated:
out = binwalk.core.common.BlockFile(
(os.path.splitext(fname)[0] + '.deobfuscated'), "wb")
out = binwalk.core.common.BlockFile((os.path.splitext(fname)[0] + '.deobfuscated'), "wb")
out.write(deobfuscated)
out.close()
return True
......
# import binwalk.core.C
import binwalk.core.plugin
# from binwalk.core.common import *
class CompressdPlugin(binwalk.core.plugin.Plugin):
# '''
# Searches for and validates compress'd data.
# '''
MODULES = ['Signature']
# READ_SIZE = 64
# COMPRESS42 = "compress42"
# COMPRESS42_FUNCTIONS = [
# binwalk.core.C.Function(name="is_compressed", type=bool),
#]
# comp = None
# def init(self):
# self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
# This plugin is currently disabled due to the need to move away from supporting C
# libraries and into a pure Python project, for cross-platform support and ease of
# installation / package maintenance. A Python implementation will likely need to
# be custom developed in the future, but for now, since this compression format is
# not very common, especially in firmware, simply disable it.
# self.comp = None
# def scan(self, result):
# if self.comp and result.file and result.description.lower().startswith("compress'd data"):
# fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.READ_SIZE)
# compressed_data = fd.read(self.READ_SIZE)
# fd.close()
# if not self.comp.is_compressed(compressed_data, len(compressed_data)):
# result.valid = False
......@@ -44,8 +44,7 @@ class CPIOPlugin(binwalk.core.plugin.Plugin):
return False
try:
result = subprocess.call(
['cpio', '-d', '-i', '--no-absolute-filenames'],
result = subprocess.call(['cpio', '-d', '-i', '--no-absolute-filenames'],
stdin=fpin,
stderr=fperr,
stdout=fperr)
......
......@@ -11,8 +11,7 @@ except ImportError as e:
class RomFSCommon(object):
def _read_next_word(self):
value = struct.unpack(
"%sL" % self.endianess, self.data[self.index:self.index + 4])[0]
value = struct.unpack("%sL" % self.endianess, self.data[self.index:self.index + 4])[0]
self.index += 4
return value
......@@ -152,8 +151,7 @@ class RomFS(object):
while True:
try:
entry = RomFSEntry(
self.data[offset:offset + self.FILE_ENTRY_SIZE], endianess=self.endianess)
entry = RomFSEntry(self.data[offset:offset + self.FILE_ENTRY_SIZE], endianess=self.endianess)
except ValueError as e:
break
......@@ -168,8 +166,7 @@ class RomFS(object):
if entry.type & entry.DIR_STRUCT_MASK:
entries[entry.uid].type = "directory"
ds = RomFSDirStruct(
self.data[entry.offset:entry.offset + entry.size], endianess=self.endianess)
ds = RomFSDirStruct(self.data[entry.offset:entry.offset + entry.size], endianess=self.endianess)
for (uid, name) in ds.ls:
if not uid in entries:
entries[uid] = FileContainer()
......
......@@ -17,8 +17,7 @@ class GzipValidPlugin(binwalk.core.plugin.Plugin):
# If this result is a gzip signature match, try to decompress the data
if result.file and result.description.lower().startswith('gzip'):
# Seek to and read the suspected gzip data
fd = self.module.config.open_file(
result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
......
......@@ -31,8 +31,7 @@ class HilinkDecryptor(binwalk.core.plugin.Plugin):
if self.enabled is True and self.module.extractor.enabled is True:
# Add an extraction rule for encrypted Hilink firmware signature
# results
self.module.extractor.add_rule(
regex="^%s" % self.SIGNATURE_DESCRIPTION,
self.module.extractor.add_rule(regex="^%s" % self.SIGNATURE_DESCRIPTION,
extension="enc",
cmd=self._decrypt_and_extract)
......@@ -72,25 +71,20 @@ class HilinkDecryptor(binwalk.core.plugin.Plugin):
if result.description.lower().startswith(self.SIGNATURE_DESCRIPTION) is True:
# Read in the first 64 bytes of the suspected encrypted
# uImage header
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
encrypted_header_data = binwalk.core.compat.str2bytes(
fd.read(64))
fd = self.module.config.open_file(result.file.name, offset=result.offset)
encrypted_header_data = binwalk.core.compat.str2bytes(fd.read(64))
fd.close()
# Decrypt the header
decrypted_header_data = self._hilink_decrypt(
encrypted_header_data)
decrypted_header_data = self._hilink_decrypt(encrypted_header_data)
# Pull out the image size and image name fields from the decrypted uImage header
# and add them to the printed description.
result.size = struct.unpack(
b">L", decrypted_header_data[12:16])[0]
result.size = struct.unpack(b">L", decrypted_header_data[12:16])[0]
result.description += ", size: %d" % (result.size)
# NOTE: The description field should be 32 bytes? Hilink seems to use only 24 bytes for this field,
# even though the header size is still 64 bytes?
result.description += ', image name: "%s"' % binwalk.core.compat.bytes2str(
decrypted_header_data[32:56]).strip("\x00")
result.description += ', image name: "%s"' % binwalk.core.compat.bytes2str(decrypted_header_data[32:56]).strip("\x00")
# Do some basic validation on the decrypted size and image
# name fields
......
......@@ -25,8 +25,7 @@ class JFFS2ValidPlugin(binwalk.core.plugin.Plugin):
header_crc = struct.unpack("<I", node_header[8:12])[0]
# Calculate the actual CRC
calculated_header_crc = (
binascii.crc32(node_header[0:8], -1) ^ -1) & 0xffffffff
calculated_header_crc = (binascii.crc32(node_header[0:8], -1) ^ -1) & 0xffffffff
# Make sure they match
return (header_crc == calculated_header_crc)
......@@ -35,8 +34,7 @@ class JFFS2ValidPlugin(binwalk.core.plugin.Plugin):
if result.file and result.description.lower().startswith('jffs2 filesystem'):
# Seek to and read the suspected JFFS2 node header
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
fd = self.module.config.open_file(result.file.name, offset=result.offset)
# JFFS2 headers are only 12 bytes in size, but reading larger amounts of
# data from disk speeds up repeated disk access and decreases performance
# hits (disk caching?).
......
......@@ -63,5 +63,4 @@ class LZMAModPlugin(binwalk.core.plugin.Plugin):
if result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if "properties: 0x5D" in result.description and "invalid dictionary size" not in result.description:
result.valid = True
result.description = result.description.split(
"invalid uncompressed size")[0] + "missing uncompressed size"
result.description = result.description.split("invalid uncompressed size")[0] + "missing uncompressed size"
......@@ -52,8 +52,7 @@ class LZMAPlugin(binwalk.core.plugin.Plugin):
if result.valid and result.file and result.description.lower().startswith('lzma compressed data'):
# Seek to and read the suspected lzma data
fd = self.module.config.open_file(
result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
......
......@@ -42,8 +42,7 @@ class TarPlugin(binwalk.core.plugin.Plugin):
if result.description.lower().startswith('posix tar archive'):
is_tar = True
file_offset = result.offset
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
fd = self.module.config.open_file(result.file.name, offset=result.offset)
while is_tar:
# read in the tar header struct
......@@ -55,8 +54,7 @@ class TarPlugin(binwalk.core.plugin.Plugin):
# include header)
try:
size = self.nti(buf[124:136])
blocks = math.ceil(
size / float(self.TAR_BLOCKSIZE)) + 1
blocks = math.ceil(size / float(self.TAR_BLOCKSIZE)) + 1
except ValueError as e:
is_tar = False
break
......
......@@ -56,8 +56,7 @@ class UBIValidPlugin(binwalk.core.plugin.Plugin):
def scan(self, result):
if result.file and result.description.lower().startswith('ubi erase count header'):
# Seek to and read the suspected UBI erase count header
fd = self.module.config.open_file(
result.file.name, offset=result.offset)
fd = self.module.config.open_file(result.file.name, offset=result.offset)
ec_header = binwalk.core.compat.str2bytes(fd.read(1024))
fd.close()
......
......@@ -28,8 +28,7 @@ class ZLIBExtractPlugin(binwalk.core.plugin.Plugin):
fpin = binwalk.core.common.BlockFile(fname)
fpout = binwalk.core.common.BlockFile(outfile, 'w')
plaintext = zlib.decompress(
binwalk.core.compat.str2bytes(fpin.read()))
plaintext = zlib.decompress(binwalk.core.compat.str2bytes(fpin.read()))
fpout.write(plaintext)
fpin.close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment