Commit afe49f53 by devttys0

Added heuristic entropy module

parent 464cb3b0
......@@ -63,6 +63,14 @@ class Kwarg(object):
self.default = default
self.description = description
class Dependency(object):
def __init__(self, attribute="", name="", kwargs={}):
self.attribute = attribute
self.name = name
self.kwargs = kwargs
self.module = None
class Result(object):
'''
Generic class for storing and accessing scan results.
......@@ -126,7 +134,13 @@ class Module(object):
KWARGS = []
# A dictionary of module dependencies; all modules depend on binwalk.modules.configuration.Configuration
DEPENDS = {'config' : 'Configuration', 'extractor' : 'Extractor'}
#DEPENDS = {'config' : 'Configuration', 'extractor' : 'Extractor'}
DEPENDS = [
Dependency(name='Configuration',
attribute='config'),
Dependency(name='Extractor',
attribute='extractor'),
]
# Format string for printing the header during a scan
#HEADER_FORMAT = "%s\n"
......@@ -158,28 +172,24 @@ class Module(object):
# Modules with a higher order are displayed first in help output
ORDER = 5
def __init__(self, dependency=False, **kwargs):
def __init__(self, **kwargs):
self.errors = []
self.results = []
self.status = None
self.enabled = False
self.name = self.__class__.__name__
self.plugins = binwalk.core.plugin.Plugins(self)
process_kwargs(self, kwargs)
# If the module was loaded as a dependency, don't display or log any results
if dependency:
self.config.display.quiet = True
self.config.display.log = None
self.plugins.load_plugins()
try:
self.load()
except KeyboardInterrupt as e:
raise e
except Exception as e:
self.error(exception=e)
self.plugins.load_plugins()
def __del__(self):
return None
......@@ -296,10 +306,9 @@ class Module(object):
self.validate(r)
self._plugins_result(r)
for (attribute, module) in iterator(self.DEPENDS):
for dependency in self.DEPENDS:
try:
dependency = getattr(self, attribute)
dependency.callback(r)
getattr(self, dependency.attribute).callback(r)
except AttributeError:
continue
......@@ -334,12 +343,12 @@ class Module(object):
self.errors.append(e)
if e.exception:
sys.stderr.write("\n" + e.module.__class__.__name__ + " Exception: " + str(e.exception) + "\n")
sys.stderr.write("\n" + e.module + " Exception: " + str(e.exception) + "\n")
sys.stderr.write("-" * exception_header_width + "\n")
traceback.print_exc(file=sys.stderr)
sys.stderr.write("-" * exception_header_width + "\n\n")
elif e.description:
sys.stderr.write("\n" + e.module.__class__.__name__ + " Error: " + e.description + "\n\n")
sys.stderr.write("\n" + e.module + " Error: " + e.description + "\n\n")
def header(self):
self.config.display.format_strings(self.HEADER_FORMAT, self.RESULT_FORMAT)
......@@ -361,9 +370,9 @@ class Module(object):
self.modules = parent.loaded_modules
# Reset all dependency modules
for (dependency, module) in iterator(self.DEPENDS):
if hasattr(self, dependency):
getattr(self, dependency).reset()
for dependency in self.DEPENDS:
if hasattr(self, dependency.attribute):
getattr(self, dependency.attribute).reset()
try:
self.init()
......@@ -427,6 +436,7 @@ class Modules(object):
'''
self.arguments = []
self.loaded_modules = {}
self.default_dependency_modules = {}
self.status = Status(completed=0, total=0)
self._set_arguments(list(argv), kargv)
......@@ -523,53 +533,65 @@ class Modules(object):
return run_modules
def run(self, module):
obj = self.load(module)
def run(self, module, dependency=False, kwargs={}):
obj = self.load(module, kwargs)
if isinstance(obj, binwalk.core.module.Module) and obj.enabled:
obj.main(parent=self)
self.status.clear()
# Add object to loaded_modules here, that way if a module has already been
# loaded directly and is subsequently also listed as a dependency we don't waste
# time loading it twice.
self.loaded_modules[module] = obj
# If the module is not being loaded as a dependency, add it to the loaded modules dictionary
if not dependency:
self.loaded_modules[module] = obj
if not kwargs and not has_key(self.default_dependency_modules, module):
self.default_dependency_modules[module] = obj
return obj
def load(self, module):
kwargs = self.argv(module, argv=self.arguments)
kwargs.update(self.dependencies(module))
return module(**kwargs)
def load(self, module, kwargs={}):
argv = self.argv(module, argv=self.arguments)
argv.update(kwargs)
argv.update(self.dependencies(module, argv['enabled']))
return module(**argv)
def dependencies(self, module):
def dependencies(self, module, module_enabled):
import binwalk.modules
kwargs = {}
attributes = {}
if hasattr(module, "DEPENDS"):
for (kwarg, dependency) in iterator(module.DEPENDS):
for dependency in module.DEPENDS:
# The dependency module must be imported by binwalk.modules.__init__.py
if hasattr(binwalk.modules, dependency):
dependency = getattr(binwalk.modules, dependency)
if hasattr(binwalk.modules, dependency.name):
dependency.module = getattr(binwalk.modules, dependency.name)
else:
sys.stderr.write("WARNING: %s depends on %s which was not found in binwalk.modules.__init__.py\n" % (str(module), dependency))
continue
raise ModuleException("%s depends on %s which was not found in binwalk.modules.__init__.py\n" % (str(module), dependency.name))
# No recursive dependencies, thanks
if dependency == module:
if dependency.module == module:
continue
if not has_key(self.loaded_modules, dependency):
# self.run will automatically add the dependency class instance to self.loaded_modules
self.run(dependency)
# Only honor custom kwargs from modules that are enabled, else madness ensues.
# Example: Heursitic module depends on entropy module, and sets entropy kwargs to contain 'enabled' : True.
# Without this check, an entropy scan would always be run, even if -H or -E weren't specified!
if module_enabled:
kwargs = dependency.kwargs
else:
kwargs = {}
if not kwargs and has_key(self.default_dependency_modules, dependency.module):
depobj = self.default_dependency_modules[dependency.module]
else:
depobj = self.run(dependency.module, dependency=True, kwargs=kwargs)
# If a dependency failed, consider this a non-recoverable error and raise an exception
if self.loaded_modules[dependency].errors:
raise ModuleException("Failed to load " + str(dependency))
else:
kwargs[kwarg] = self.loaded_modules[dependency]
if depobj.errors:
raise ModuleException("Failed to load " + dependency.name)
else:
attributes[dependency.attribute] = depobj
return kwargs
return attributes
def argv(self, module, argv=sys.argv[1:]):
'''
......@@ -580,7 +602,7 @@ class Modules(object):
Returns a dictionary of kwargs for the specified module.
'''
kwargs = {}
kwargs = {'enabled' : False}
last_priority = {}
longs = []
shorts = ""
......@@ -645,9 +667,6 @@ class Modules(object):
else:
kwargs[name] = value
if not has_key(kwargs, 'enabled'):
kwargs['enabled'] = False
return kwargs
def kwargs(self, module, kwargs):
......
......@@ -5,3 +5,4 @@ from binwalk.modules.hashmatch import HashMatch
from binwalk.modules.configuration import Configuration
from binwalk.modules.extractor import Extractor
from binwalk.modules.entropy import Entropy
from binwalk.modules.heuristics import HeuristicCompressionAnalyzer
......@@ -13,7 +13,7 @@ class Configuration(Module):
TITLE = "General"
ORDER = 0
DEPENDS = {}
DEPENDS = []
CLI = [
Option(long='length',
......
......@@ -42,6 +42,7 @@ class Entropy(Module):
KWARGS = [
Kwarg(name='enabled', default=False),
Kwarg(name='save_plot', default=False),
Kwarg(name='display_results', default=True),
Kwarg(name='do_plot', default=True),
Kwarg(name='show_legend', default=True),
Kwarg(name='block_size', default=1024),
......@@ -53,7 +54,6 @@ class Entropy(Module):
def init(self):
self.HEADER[-1] = "ENTROPY"
self.algorithm = self.shannon
self.display_results = True
self.max_description_length = 0
self.file_markers = {}
......@@ -91,7 +91,7 @@ class Entropy(Module):
if self.display_results:
self.footer()
if not self.save_plot:
if self.do_plot and not self.save_plot:
QtGui.QApplication.instance().exec_()
def calculate_file_entropy(self, fp):
......
#!/usr/bin/env python
# Routines to perform Monte Carlo Pi approximation and Chi Squared tests.
# Used for fingerprinting unknown areas of high entropy (e.g., is this block of high entropy data compressed or encrypted?).
# Inspired by people who actually know what they're doing: http://www.fourmilab.ch/random/
import math
from binwalk.core.compat import *
from binwalk.core.module import Module, Kwarg, Option, Dependency
class ChiSquare(object):
'''
Performs a Chi Squared test against the provided data.
'''
IDEAL = 256.0
def __init__(self):
'''
Class constructor.
Returns None.
'''
self.bytes = {}
self.freedom = self.IDEAL - 1
# Initialize the self.bytes dictionary with keys for all possible byte values (0 - 255)
for i in range(0, int(self.IDEAL)):
self.bytes[chr(i)] = 0
self.reset()
def reset(self):
self.xc2 = 0.0
self.byte_count = 0
for key in self.bytes.keys():
self.bytes[key] = 0
def update(self, data):
'''
Updates the current byte counts with new data.
@data - String of bytes to update.
Returns None.
'''
# Count the number of occurances of each byte value
for i in data:
self.bytes[i] += 1
self.byte_count += len(data)
def chisq(self):
'''
Calculate the Chi Square critical value.
Returns the critical value.
'''
expected = self.byte_count / self.IDEAL
if expected:
for byte in self.bytes.values():
self.xc2 += ((byte - expected) ** 2 ) / expected
return self.xc2
class EntropicBlock(object):
def __init__(self, **kwargs):
self.start = None
self.end = None
self.length = None
for (k,v) in iterator(kwargs):
setattr(self, k, v)
class HeuristicCompressionAnalyzer(Module):
'''
Performs analysis and attempts to interpret the results.
'''
BLOCK_SIZE = 32
CHI_CUTOFF = 512
ENTROPY_TRIGGER = .90
MIN_BLOCK_SIZE = 4096
BLOCK_OFFSET = 1024
TITLE = "Heuristic Compression"
DEPENDS = [
Dependency(name='Configuration',
attribute='config'),
Dependency(name='Entropy',
attribute='entropy',
kwargs={'enabled' : True, 'do_plot' : False, 'display_results' : False}),
]
{'config' : 'Configuration', 'entropy' : 'Entropy'}
CLI = [
Option(short='H',
long='heuristic',
kwargs={'enabled' : True},
description='Heuristically classify high entropy data'),
]
KWARGS = [
Kwarg(name='enabled', default=False),
]
def init(self):
self.blocks = {}
self.HEADER[-1] = "HEURISTIC ENTROPY ANALYSIS"
for result in self.entropy.results:
if not has_key(self.blocks, result.file.name):
self.blocks[result.file.name] = []
if result.entropy >= self.ENTROPY_TRIGGER and (not self.blocks[result.file.name] or self.blocks[result.file.name][-1].end is not None):
self.blocks[result.file.name].append(EntropicBlock(start=result.offset + self.BLOCK_OFFSET))
elif result.entropy < self.ENTROPY_TRIGGER and self.blocks[result.file.name] and self.blocks[result.file.name][-1].end is None:
self.blocks[result.file.name][-1].end = result.offset - self.BLOCK_OFFSET
def run(self):
for fp in self.config.target_files:
if has_key(self.blocks, fp.name):
self.header()
for block in self.blocks[fp.name]:
if block.end is None:
block.length = fp.offset + fp.length - block.start
else:
block.length = block.end - block.start
if block.length >= self.MIN_BLOCK_SIZE:
self.analyze(fp, block)
self.footer()
def analyze(self, fp, block):
'''
Perform analysis and interpretation.
'''
i = 0
num_error = 0
analyzer_results = []
chi = ChiSquare()
fp.seek(block.start)
while i < block.length:
j = 0
(d, dlen) = fp.read_block()
if not d:
break
while j < dlen:
chi.reset()
data = d[j:j+self.BLOCK_SIZE]
if len(data) < self.BLOCK_SIZE:
break
chi.update(data)
if chi.chisq() >= self.CHI_CUTOFF:
num_error += 1
j += self.BLOCK_SIZE
if (j + i) > block.length:
break
i += dlen
if num_error > 0:
verdict = 'Moderate entropy data, best guess: compressed'
else:
verdict = 'High entropy data, best guess: encrypted'
desc = '%s, size: %d, %d low entropy blocks' % (verdict, block.length, num_error)
self.result(offset=block.start, description=desc, file=fp)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment