Commit e9d9055a by devttys0

Restructured binwalk directory

parent 659126cb
from binwalk.module import Modules
from binwalk.core.module import Modules
# Contains all the command line options and usage output for the binwlak script.
# Placed here so that other scripts can programmatically access the command line options list (e.g., for auto-completion generation).
import os
import sys
import binwalk.config
short_options = "23AaBbCcdEeGHhIiJkLMNnOPpQqrSTtUuVvWwz?D:F:f:g:j:K:o:l:m:R:s:X:x:Y:y:Z:"
long_options = [
"2D",
"3D",
"3d",
"rm",
"help",
"green",
"red",
"blue",
"rehash",
"examples",
"quiet",
"csv",
"verbose",
"opcodes",
"cast",
"update",
"binwalk",
"keep-going",
"show-invalid",
"show-grids",
"ignore-time-skew",
"honor-footers",
"profile",
"delay", # delay is depreciated, but kept for backwards compatability
"skip-unopened",
"term",
"tim",
"terse",
"diff",
"dumb",
"entropy",
"heuristic",
"math",
"gzip",
"save-plot",
"no-plot",
"no-legend",
"strings",
"carve",
"max-points=",
"matryoshka=",
"list-plugins",
"disable-plugins",
"disable-plugin=",
"enable-plugin=",
"max-size=",
"marker=",
"strlen=",
"file=",
"block=",
"offset=",
"length=",
"exclude=",
"include=",
"search=",
"extract=",
"dd=",
"grep=",
"magic=",
"raw-bytes=",
]
def usage(fd):
fd.write("\n")
fd.write("Binwalk v%s\n" % binwalk.config.Config.VERSION)
fd.write("Craig Heffner, http://www.devttys0.com\n")
fd.write("\n")
fd.write("Usage: %s [OPTIONS] [FILE1] [FILE2] [FILE3] ...\n" % os.path.basename(sys.argv[0]))
fd.write("\n")
fd.write("Signature Analysis:\n")
fd.write("\t-B, --binwalk Perform a file signature scan (default)\n")
fd.write("\t-R, --raw-bytes=<string> Search for a custom signature\n")
fd.write("\t-A, --opcodes Scan for executable code signatures\n")
fd.write("\t-C, --cast Cast file contents as various data types\n")
fd.write("\t-m, --magic=<file> Specify an alternate magic file to use\n")
fd.write("\t-x, --exclude=<filter> Exclude matches that have <filter> in their description\n")
fd.write("\t-y, --include=<filter> Only search for matches that have <filter> in their description\n")
fd.write("\t-I, --show-invalid Show results marked as invalid\n")
fd.write("\t-T, --ignore-time-skew Do not show results that have timestamps more than 1 year in the future\n")
fd.write("\t-k, --keep-going Show all matching results at a given offset, not just the first one\n")
fd.write("\t-b, --dumb Disable smart signature keywords\n")
fd.write("\n")
fd.write("Strings Analysis:\n")
fd.write("\t-S, --strings Scan for ASCII strings (may be combined with -B, -R, -A, or -E)\n")
fd.write("\t-s, --strlen=<n> Set the minimum string length to search for (default: 3)\n")
fd.write("\n")
fd.write("Entropy Analysis:\n")
fd.write("\t-E, --entropy Plot file entropy (may be combined with -B, -R, -A, or -S)\n")
fd.write("\t-H, --heuristic Identify unknown compression/encryption based on entropy heuristics (implies -E)\n")
fd.write("\t-K, --block=<int> Set the block size for entropy analysis (default: %d)\n" % binwalk.entropy.FileEntropy.DEFAULT_BLOCK_SIZE)
fd.write("\t-a, --gzip Use gzip compression ratios to measure entropy\n")
fd.write("\t-N, --no-plot Do not generate an entropy plot graph\n")
fd.write("\t-F, --marker=<offset:name> Add a marker to the entropy plot graph\n")
fd.write("\t-Q, --no-legend Omit the legend from the entropy plot graph\n")
fd.write("\t-J, --save-plot Save plot as a PNG (implied if multiple files are specified)\n")
fd.write("\n")
fd.write("Binary Visualization:\n")
fd.write("\t-3, --3D Generate a 3D binary visualization\n")
fd.write("\t-2, --2D Project data points onto 3D cube walls only\n")
fd.write("\t-Z, --max-points Set the maximum number of plotted data points (defulat: %d)\n" % binwalk.plotter.Plotter.MAX_PLOT_POINTS)
fd.write("\t-V, --show-grids Display the x-y-z grids in the resulting plot\n")
fd.write("\n")
fd.write("Binary Diffing:\n")
fd.write("\t-W, --diff Hexdump / diff the specified files\n")
fd.write("\t-K, --block=<int> Number of bytes to display per line (default: %d)\n" % binwalk.hexdiff.HexDiff.DEFAULT_BLOCK_SIZE)
fd.write("\t-G, --green Only show hex dump lines that contain bytes which were the same in all files\n")
fd.write("\t-i, --red Only show hex dump lines that contain bytes which were different in all files\n")
fd.write("\t-U, --blue Only show hex dump lines that contain bytes which were different in some files\n")
fd.write("\t-w, --terse Diff all files, but only display a hex dump of the first file\n")
fd.write("\n")
fd.write("Extraction Options:\n")
fd.write("\t-D, --dd=<type:ext:cmd> Extract <type> signatures, give the files an extension of <ext>, and execute <cmd>\n")
fd.write("\t-e, --extract=[file] Automatically extract known file types; load rules from file, if specified\n")
fd.write("\t-M, --matryoshka=[n] Recursively scan extracted files, up to n levels deep (8 levels of recursion is the default)\n")
fd.write("\t-j, --max-size=<int> Limit extracted file sizes (default: no limit)\n")
fd.write("\t-r, --rm Cleanup extracted files and zero-size files\n")
fd.write("\t-d, --honor-footers Only extract files up to their corresponding footer signatures\n")
fd.write("\t-z, --carve Carve data from files, but don't execute extraction utilities (implies -d)\n")
fd.write("\t-P, --rehash Recursively diff data extracted from FILE1 with the data extracted from all other files.\n")
fd.write("\n")
fd.write("Plugin Options:\n")
fd.write("\t-X, --disable-plugin=<name> Disable a plugin by name\n")
fd.write("\t-Y, --enable-plugin=<name> Enable a plugin by name\n")
fd.write("\t-p, --disable-plugins Do not load any binwalk plugins\n")
fd.write("\t-L, --list-plugins List all user and system plugins by name\n")
fd.write("\n")
fd.write("General Options:\n")
fd.write("\t-o, --offset=<int> Start scan at this file offset\n")
fd.write("\t-l, --length=<int> Number of bytes to scan\n")
fd.write("\t-g, --grep=<text> Grep results for the specified text\n")
fd.write("\t-f, --file=<file> Log results to file\n")
fd.write("\t-c, --csv Log results to file in csv format\n")
fd.write("\t-O, --skip-unopened Ignore file open errors and process only the files that can be opened\n")
fd.write("\t-t, --term Format output to fit the terminal window\n")
fd.write("\t-q, --quiet Suppress output to stdout\n")
fd.write("\t-v, --verbose Be verbose (specify twice for very verbose)\n")
fd.write("\t-u, --update Update magic signature files\n")
fd.write("\t-?, --examples Show example usage\n")
fd.write("\t-h, --help Show help output\n")
fd.write("\n")
if fd == sys.stderr:
sys.exit(1)
else:
sys.exit(0)
#!/usr/bin/env python
# Routines to perform Monte Carlo Pi approximation and Chi Squared tests.
# Used for fingerprinting unknown areas of high entropy (e.g., is this block of high entropy data compressed or encrypted?).
# Inspired by people who actually know what they're doing: http://www.fourmilab.ch/random/
import math
import binwalk.common as common
from binwalk.compat import *
class MonteCarloPi(object):
'''
Performs a Monte Carlo Pi approximation.
Currently unused.
'''
def __init__(self):
'''
Class constructor.
Returns None.
'''
self.reset()
def reset(self):
'''
Reset state to the beginning.
'''
self.pi = 0
self.error = 0
self.m = 0
self.n = 0
def update(self, data):
'''
Update the pi approximation with new data.
@data - A string of bytes to update (length must be >= 6).
Returns None.
'''
c = 0
dlen = len(data)
while (c+6) < dlen:
# Treat 3 bytes as an x coordinate, the next 3 bytes as a y coordinate.
# Our box is 1x1, so divide by 2^24 to put the x y values inside the box.
x = ((ord(data[c]) << 16) + (ord(data[c+1]) << 8) + ord(data[c+2])) / 16777216.0
c += 3
y = ((ord(data[c]) << 16) + (ord(data[c+1]) << 8) + ord(data[c+2])) / 16777216.0
c += 3
# Does the x,y point lie inside the circle inscribed within our box, with diameter == 1?
if ((x**2) + (y**2)) <= 1:
self.m += 1
self.n += 1
def montecarlo(self):
'''
Approximates the value of Pi based on the provided data.
Returns a tuple of (approximated value of pi, percent deviation).
'''
if self.n:
self.pi = (float(self.m) / float(self.n) * 4.0)
if self.pi:
self.error = math.fabs(1.0 - (math.pi / self.pi)) * 100.0
return (self.pi, self.error)
else:
return (0.0, 0.0)
class ChiSquare(object):
'''
Performs a Chi Squared test against the provided data.
'''
IDEAL = 256.0
def __init__(self):
'''
Class constructor.
Returns None.
'''
self.bytes = {}
self.freedom = self.IDEAL - 1
# Initialize the self.bytes dictionary with keys for all possible byte values (0 - 255)
for i in range(0, int(self.IDEAL)):
self.bytes[chr(i)] = 0
self.reset()
def reset(self):
self.xc2 = 0.0
self.byte_count = 0
for key in self.bytes.keys():
self.bytes[key] = 0
def update(self, data):
'''
Updates the current byte counts with new data.
@data - String of bytes to update.
Returns None.
'''
# Count the number of occurances of each byte value
for i in data:
self.bytes[i] += 1
self.byte_count += len(data)
def chisq(self):
'''
Calculate the Chi Square critical value.
Returns the critical value.
'''
expected = self.byte_count / self.IDEAL
if expected:
for byte in self.bytes.values():
self.xc2 += ((byte - expected) ** 2 ) / expected
return self.xc2
class CompressionEntropyAnalyzer(object):
'''
Class wrapper around ChiSquare.
Performs analysis and attempts to interpret the results.
'''
BLOCK_SIZE = 32
CHI_CUTOFF = 512
DESCRIPTION = "Statistical Compression Analysis"
def __init__(self, fname, start, length, binwalk=None):
'''
Class constructor.
@fname - The file to scan.
@start - The start offset to begin analysis at.
@length - The number of bytes to analyze.
@binwalk - Binwalk class object.
Returns None.
'''
self.fp = common.BlockFile(fname, 'r', offset=start, length=length)
# Read block size must be at least as large as our analysis block size
if self.fp.READ_BLOCK_SIZE < self.BLOCK_SIZE:
self.fp.READ_BLOCK_SIZE = self.BLOCK_SIZE
self.start = self.fp.offset
self.length = length
self.binwalk = binwalk
def __del__(self):
try:
self.fp.close()
except KeyboardInterrupt as e:
raise e
except Exception:
pass
def analyze(self):
'''
Perform analysis and interpretation.
Returns a descriptive string containing the results and attempted interpretation.
'''
i = 0
num_error = 0
analyzer_results = []
if self.binwalk:
self.binwalk.display.header(file_name=self.fp.name, description=self.DESCRIPTION)
chi = ChiSquare()
while i < self.length:
j = 0
(d, dlen) = self.fp.read_block()
while j < dlen:
chi.reset()
data = d[j:j+self.BLOCK_SIZE]
if len(data) < self.BLOCK_SIZE:
break
chi.update(data)
if chi.chisq() >= self.CHI_CUTOFF:
num_error += 1
j += self.BLOCK_SIZE
i += dlen
if num_error > 0:
verdict = 'Moderate entropy data, best guess: compressed'
else:
verdict = 'High entropy data, best guess: encrypted'
result = [{'offset' : self.start, 'description' : '%s, size: %d, %d low entropy blocks' % (verdict, self.length, num_error)}]
if self.binwalk:
self.binwalk.display.results(self.start, result)
self.binwalk.display.footer()
return result
......@@ -5,7 +5,7 @@ import re
import ast
import hashlib
import operator as op
from binwalk.compat import *
from binwalk.core.compat import *
# This allows other modules/scripts to subclass BlockFile from a custom class. Defaults to io.FileIO.
if has_key(__builtins__, 'BLOCK_FILE_PARENT_CLASS'):
......
import os
import binwalk.common as common
from binwalk.compat import *
import binwalk.core.common as common
from binwalk.core.compat import *
class Config:
'''
......@@ -117,7 +117,7 @@ class Config:
root = __file__
if os.path.islink(root):
root = os.path.realpath(root)
return os.path.dirname(os.path.abspath(root))
return os.path.dirname(os.path.dirname(os.path.abspath(root)))
except KeyboardInterrupt as e:
raise e
except Exception:
......
import re
import binwalk.common as common
from binwalk.smartsignature import SmartSignature
from binwalk.compat import *
import binwalk.core.common as common
from binwalk.core.smartsignature import SmartSignature
from binwalk.core.compat import *
class MagicFilter:
'''
......
......@@ -3,12 +3,12 @@ import os
import sys
import inspect
import argparse
import binwalk.common
import binwalk.config
import binwalk.plugin
from binwalk.compat import *
import binwalk.core.common
import binwalk.core.config
import binwalk.core.plugin
from binwalk.core.compat import *
class ModuleOption(object):
class Option(object):
'''
A container class that allows modules to declare command line options.
'''
......@@ -22,7 +22,7 @@ class ModuleOption(object):
@description - A description to be displayed in the help output.
@short - The short option to use (optional).
@long - The long option to use (if None, this option will not be displayed in help output).
@type - The accepted data type (one of: io.FileIO/argparse.FileType/binwalk.common.BlockFile, list, str, int, float).
@type - The accepted data type (one of: io.FileIO/argparse.FileType/binwalk.core.common.BlockFile, list, str, int, float).
@dtype - The displayed accepted type string, to be shown in help output.
Returns None.
......@@ -36,14 +36,14 @@ class ModuleOption(object):
self.dtype = str(dtype)
if not self.dtype:
if self.type in [io.FileIO, argparse.FileType, binwalk.common.BlockFile]:
if self.type in [io.FileIO, argparse.FileType, binwalk.core.common.BlockFile]:
self.dtype = 'file'
elif self.type in [int, float, str]:
self.dtype = self.type.__name__
else:
self.dtype = str.__name__
class ModuleKwarg(object):
class Kwarg(object):
'''
A container class allowing modules to specify their expected __init__ kwarg(s).
'''
......@@ -93,12 +93,12 @@ class Result(object):
class Error(Result):
'''
A subclass of binwalk.module.Result.
A subclass of binwalk.core.module.Result.
'''
def __init__(self, **kwargs):
'''
Accepts all the same kwargs as binwalk.module.Result, but the following are also added:
Accepts all the same kwargs as binwalk.core.module.Result, but the following are also added:
@exception - In case of an exception, this is the exception object.
......@@ -114,13 +114,13 @@ class Module(object):
# The module title, as displayed in help output
TITLE = ""
# A list of binwalk.module.ModuleOption command line options
# A list of binwalk.core.module.ModuleOption command line options
CLI = []
# A list of binwalk.module.ModuleKwargs accepted by __init__
# A list of binwalk.core.module.ModuleKwargs accepted by __init__
KWARGS = []
# A dictionary of module dependencies; all modules depend on binwalk.modules.configuration.Configuration
# A dictionary of module dependencies; all modules depend on binwalk.core.modules.configuration.Configuration
DEPENDS = {'config' : 'Configuration', 'extractor' : 'Extractor'}
# Format string for printing the header during a scan
......@@ -144,7 +144,7 @@ class Module(object):
self.results = []
self.status = None
self.name = self.__class__.__name__
self.plugins = binwalk.plugin.Plugins(self)
self.plugins = binwalk.core.plugin.Plugins(self)
process_kwargs(self, kwargs)
......@@ -187,11 +187,11 @@ class Module(object):
'''
return False
def process_result(self, r):
def callback(self, r):
'''
Processes the result. Passed to all dependency modules when a valid result is found.
Processes the result from all modules. Called for all dependency modules when a valid result is found.
@r - The result, an instance of binwalk.module.Result.
@r - The result, an instance of binwalk.core.module.Result.
Returns None.
'''
......@@ -202,7 +202,7 @@ class Module(object):
Validates the result.
May be overridden by the module sub-class.
@r - The result, an instance of binwalk.module.Result.
@r - The result, an instance of binwalk.core.module.Result.
Returns None.
'''
......@@ -235,9 +235,9 @@ class Module(object):
def result(self, r=None, **kwargs):
'''
Validates a result, stores it in self.results and prints it.
Accepts the same kwargs as the binwalk.module.Result class.
Accepts the same kwargs as the binwalk.core.module.Result class.
@r - An existing instance of binwalk.module.Result.
@r - An existing instance of binwalk.core.module.Result.
Returns None.
'''
......@@ -245,13 +245,14 @@ class Module(object):
r = Result(**kwargs)
self.validate(r)
for (attribute, module) in iterator(self.DEPENDS):
dependency = getattr(self, attribute)
dependency.callback(r)
self._plugins_result(r)
if r.valid:
for (attribute, module) in iterator(self.DEPENDS):
dependency = getattr(self, attribute)
dependency.process_result(r)
self.results.append(r)
# Update the progress status automatically if it is not being done manually by the module
......@@ -268,7 +269,7 @@ class Module(object):
'''
Stores the specified error in self.errors.
Accepts the same kwargs as the binwalk.module.Error class.
Accepts the same kwargs as the binwalk.core.module.Error class.
Returns None.
'''
......@@ -331,7 +332,10 @@ class Module(object):
return retval
class Status(object):
'''
Class used for tracking module status (e.g., % complete).
'''
def __init__(self, **kwargs):
self.kwargs = kwargs
self.clear()
......@@ -391,17 +395,17 @@ class Modules(object):
Returns a list of modules that contain the specified attribute.
'''
import binwalk.modules
import binwalk.core.modules
modules = []
for (name, module) in inspect.getmembers(binwalk.modules):
for (name, module) in inspect.getmembers(binwalk.core.modules):
if inspect.isclass(module) and hasattr(module, attribute):
modules.append(module)
return modules
def help(self):
help_string = "\nBinwalk v%s\nCraig Heffner, http://www.binwalk.org\n" % binwalk.config.Config.VERSION
help_string = "\nBinwalk v%s\nCraig Heffner, http://www.binwalk.core.org\n" % binwalk.core.config.Config.VERSION
for obj in self.list(attribute="CLI"):
if obj.CLI:
......@@ -449,7 +453,7 @@ class Modules(object):
def run(self, module):
obj = self.load(module)
if isinstance(obj, binwalk.module.Module) and obj.enabled:
if isinstance(obj, binwalk.core.module.Module) and obj.enabled:
obj.main(status=self.status)
self.status.clear()
......@@ -465,17 +469,17 @@ class Modules(object):
return module(**kwargs)
def dependencies(self, module):
import binwalk.modules
import binwalk.core.modules
kwargs = {}
if hasattr(module, "DEPENDS"):
for (kwarg, dependency) in iterator(module.DEPENDS):
# The dependency module must be imported by binwalk.modules.__init__.py
if hasattr(binwalk.modules, dependency):
dependency = getattr(binwalk.modules, dependency)
# The dependency module must be imported by binwalk.core.modules.__init__.py
if hasattr(binwalk.core.modules, dependency):
dependency = getattr(binwalk.core.modules, dependency)
else:
sys.stderr.write("WARNING: %s depends on %s which was not found in binwalk.modules.__init__.py\n" % (str(module), dependency))
sys.stderr.write("WARNING: %s depends on %s which was not found in binwalk.core.modules.__init__.py\n" % (str(module), dependency))
continue
# No recursive dependencies, thanks
......@@ -530,7 +534,7 @@ class Modules(object):
# Only add parsed options pertinent to the requested module
for module_option in module.CLI:
if module_option.type == binwalk.common.BlockFile:
if module_option.type == binwalk.core.common.BlockFile:
for k in get_keys(module_option.kwargs):
kwargs[k] = []
......@@ -590,14 +594,14 @@ class Modules(object):
if not hasattr(module, k):
setattr(module, k, v)
else:
raise Exception("binwalk.module.Modules.process_kwargs: %s has no attribute 'KWARGS'" % str(module))
raise Exception("binwalk.core.module.Modules.process_kwargs: %s has no attribute 'KWARGS'" % str(module))
def process_kwargs(obj, kwargs):
'''
Convenience wrapper around binwalk.module.Modules.kwargs.
Convenience wrapper around binwalk.core.module.Modules.kwargs.
@obj - The class object (an instance of a sub-class of binwalk.module.Module).
@obj - The class object (an instance of a sub-class of binwalk.core.module.Module).
@kwargs - The kwargs provided to the object's __init__ method.
Returns None.
......@@ -606,7 +610,7 @@ def process_kwargs(obj, kwargs):
def show_help(fd=sys.stdout):
'''
Convenience wrapper around binwalk.module.Modules.help.
Convenience wrapper around binwalk.core.module.Modules.help.
@fd - An object with a write method (e.g., sys.stdout, sys.stderr, etc).
......
......@@ -2,8 +2,8 @@ import io
import re
import os.path
import tempfile
from binwalk.compat import *
from binwalk.common import str2int
from binwalk.core.compat import *
from binwalk.core.common import str2int
class MagicParser:
'''
......
import os
import sys
import imp
import binwalk.config
from binwalk.compat import *
import binwalk.core.config
from binwalk.core.compat import *
class Plugins:
'''
......@@ -54,7 +54,7 @@ class Plugins:
self.pre_scan = []
self.post_scan = []
self.parent = parent
self.config = binwalk.config.Config()
self.config = binwalk.core.config.Config()
def __del__(self):
pass
......
import re
import binwalk.module
from binwalk.compat import *
from binwalk.common import str2int, get_quoted_strings, MathExpression
import binwalk.core.module
from binwalk.core.compat import *
from binwalk.core.common import str2int, get_quoted_strings, MathExpression
class SmartSignature:
'''
......@@ -127,7 +127,7 @@ class SmartSignature:
results['valid'] = self.valid
return binwalk.module.Result(**results)
return binwalk.core.module.Result(**results)
def _is_valid(self, data):
'''
......
import zlib
import math
import os.path
import binwalk.plugins as plugins
import binwalk.common as common
import binwalk.compression as compression
from binwalk.compat import *
class PlotEntropy(object):
'''
Class to plot entropy data on a graph.
'''
XLABEL = 'Offset'
YLABEL = 'Entropy'
XUNITS = 'B'
YUNITS = 'E'
COLORS = ['r', 'g', 'c', 'b', 'm']
FILE_WIDTH = 1024
FILE_FORMAT = 'png'
def __init__(self, x, y, title='Entropy', average=0, file_results={}, show_legend=True, save=False):
'''
Plots entropy data.
@x - List of graph x-coordinates (i.e., data offsets).
@y - List of graph y-coordinates (i.e., entropy for each offset).
@title - Graph title.
@average - The average entropy.
@file_results - Binwalk results, if any.
@show_legend - Set to False to not generate a color-coded legend and plotted x coordinates for the graph.
@save - If set to True, graph will be saved to disk rather than displayed.
Returns None.
'''
import numpy as np
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtGui
i = 0
descriptions = {}
plotted_colors = {}
max_description_length = None
for (offset, results) in file_results:
description = results[0]['description'].split(',')[0]
desc_len = len(description)
if not max_description_length or desc_len > max_description_length:
max_description_length = desc_len
if has_key(descriptions, offset):
descriptions[offset].append(description)
else:
descriptions[offset] = [description]
#pg.setConfigOption('background', 'w')
#pg.setConfigOption('foreground', 'k')
plt = pg.plot(title=title, clear=True)
plt.plot(x, y, pen='y') #pen='b'
if file_results and show_legend:
plt.addLegend(size=(max_description_length*10, 0))
# Don't really like the way pyqtgraph draws these infinite horizontal lines
#if average:
# plt.addLine(y=average, pen='r')
if descriptions:
ordered_offsets = get_keys(descriptions)
ordered_offsets.sort()
for offset in ordered_offsets:
for description in descriptions[offset]:
# If this description has already been plotted at a different offset, we need to
# use the same color for the marker, but set the description to None to prevent
# duplicate entries in the graph legend.
#
# Else, get the next color and use it to mark descriptions of this type.
if has_key(plotted_colors, description):
color = plotted_colors[description]
description = None
else:
color = self.COLORS[i]
plotted_colors[description] = color
i += 1
if i >= len(self.COLORS):
i = 0
plt.plot(x=[offset,offset], y=[0,1.1], name=description, pen=pg.mkPen(color, width=2.5))
if save:
exporter = pg.exporters.ImageExporter.ImageExporter(plt.plotItem)
exporter.parameters()['width'] = self.FILE_WIDTH
exporter.export(common.unique_file_name(title, self.FILE_FORMAT))
else:
# Only set the axis labels if we're displaying a live window (axis labels aren't well-placed when saving directly to file)
plt.setLabel('left', self.YLABEL, units=self.YUNITS)
plt.setLabel('bottom', self.XLABEL, units=self.XUNITS)
QtGui.QApplication.instance().exec_()
class FileEntropy(object):
'''
Class for analyzing and plotting data entropy for a file.
Preferred to use the Entropy class instead of calling FileEntropy directly.
'''
DEFAULT_BLOCK_SIZE = 1024
ENTROPY_TRIGGER = 0.9
ENTROPY_MAX = 0.95
def __init__(self, file_name=None, binwalk=None, offset=0, length=None, block=DEFAULT_BLOCK_SIZE, plugins=None, file_results=[], compcheck=False):
'''
Class constructor.
@file_name - The path to the file to analyze.
@binwalk - An instance of the Binwalk class.
@offset - The offset into the data to begin analysis.
@length - The number of bytes to analyze.
@block - The size of the data blocks to analyze.
@plugins - Instance of the Plugins class.
@file_results - Scan results to overlay on the entropy plot graph.
@compcheck - Set to True to enable entropy compression detection.
Returns None.
'''
self.start = offset
self.length = length
self.block = block
self.binwalk = binwalk
self.plugins = plugins
self.total_read = 0
self.current_data_block = ''
self.current_data_block_len = 0
self.current_data_block_offset = 0
self.file_results = file_results
self.do_chisq = compcheck
if file_name is None:
raise Exception("Entropy.__init__ requires at least the file_name option")
if not self.length:
self.length = 0
if not self.start:
self.start = 0
if not self.block:
self.block = self.DEFAULT_BLOCK_SIZE
self.fd = common.BlockFile(file_name, 'r', offset=self.start, length=self.length)
self.start = self.fd.offset
self.fd.MAX_TRAILING_SIZE = 0
if self.fd.READ_BLOCK_SIZE < self.block:
self.fd.READ_BLOCK_SIZE = self.block
if self.binwalk:
# Set the total_scanned and scan_length values for plugins and status display messages
self.binwalk.total_scanned = 0
self.binwalk.scan_length = self.fd.length
def __enter__(self):
return self
def __del__(self):
self.cleanup()
def __exit__(self, t, v, traceback):
self.cleanup()
def cleanup(self):
'''
Clean up any open file objects.
Called internally by __del__ and __exit__.
Returns None.
'''
try:
self.fd.close()
except KeyboardInterrupt as e:
raise e
except Exception:
pass
def _read_block(self):
offset = self.total_read
if self.current_data_block_offset >= self.current_data_block_len:
self.current_data_block_offset = 0
(self.current_data_block, self.current_data_block_len) = self.fd.read_block()
if self.current_data_block and (self.current_data_block_len-self.current_data_block_offset) >= self.block:
data = self.current_data_block[self.current_data_block_offset:self.current_data_block_offset+self.block]
dlen = self.block
else:
data = ''
dlen = 0
self.current_data_block_offset += dlen
self.total_read += dlen
if self.binwalk:
self.binwalk.total_scanned = self.total_read
return (dlen, data, offset+self.start)
def gzip(self, offset, data, truncate=True):
'''
Performs an entropy analysis based on zlib compression ratio.
This is faster than the shannon entropy analysis, but not as accurate.
'''
# Entropy is a simple ratio of: <zlib compressed size> / <original size>
e = float(float(len(zlib.compress(data, 9))) / float(len(data)))
if truncate and e > 1.0:
e = 1.0
return e
def shannon(self, offset, data):
'''
Performs a Shannon entropy analysis on a given block of data.
'''
entropy = 0
dlen = len(data)
if not data:
return 0
for x in range(256):
p_x = float(data.count(chr(x))) / dlen
if p_x > 0:
entropy += - p_x*math.log(p_x, 2)
return (entropy / 8)
def _do_analysis(self, algorithm):
'''
Performs an entropy analysis using the provided algorithm.
@algorithm - A function/method to call which returns an entropy value.
Returns a tuple of ([x-coordinates], [y-coordinates], average_entropy), where:
o x-coordinates = A list of offsets analyzed inside the data.
o y-coordinates = A corresponding list of entropy for each offset.
'''
offsets = []
entropy = []
average = 0
total = 0
self.total_read = 0
plug_ret = plugins.PLUGIN_CONTINUE
plug_pre_ret = plugins.PLUGIN_CONTINUE
if self.plugins:
plug_pre_ret = self.plugins._pre_scan_callbacks(self.fd)
while not ((plug_pre_ret | plug_ret) & plugins.PLUGIN_TERMINATE):
(dlen, data, offset) = self._read_block()
if not dlen or not data:
break
e = algorithm(offset, data)
results = {'description' : '%f' % e, 'offset' : offset}
if self.plugins:
plug_ret = self.plugins._scan_callbacks(results)
offset = results['offset']
e = float(results['description'])
if not ((plug_pre_ret | plug_ret) & (plugins.PLUGIN_TERMINATE | plugins.PLUGIN_NO_DISPLAY)):
if self.binwalk and not self.do_chisq:
self.binwalk.display.results(offset, [results])
entropy.append(e)
offsets.append(offset)
total += e
try:
# This results in a divide by zero if one/all plugins returns PLUGIN_TERMINATE or PLUGIN_NO_DISPLAY,
# or if the file being scanned is a zero-size file.
average = float(float(total) / float(len(offsets)))
except KeyboardInterrupt as e:
raise e
except Exception:
pass
if self.plugins:
self.plugins._post_scan_callbacks(self.fd)
if self.do_chisq:
self._look_for_compression(offsets, entropy)
return (offsets, entropy, average)
def _look_for_compression(self, x, y):
'''
Analyzes areas of high entropy for signs of compression or encryption and displays the results.
'''
trigger = self.ENTROPY_TRIGGER
pairs = []
scan_pairs = []
index = -1
total = 0
if not self.file_results:
for j in range(0, len(x)):
if y[j] >= trigger and (j == 0 or y[j-1] < trigger):
pairs.append([x[j]])
index = len(pairs) - 1
elif y[j] <= trigger and y[j-1] > trigger and index > -1 and len(pairs[index]) == 1:
pairs[index].append(x[j])
# Generate a list of tuples containing the starting offset to begin analysis plus a length
for pair in pairs:
start = pair[0]
if len(pair) == 2:
stop = pair[1]
else:
self.fd.seek(0, 2)
stop = self.fd.tell()
length = stop - start
total += length
scan_pairs.append((start, length))
# Update the binwalk scan length and total scanned values so that the percent complete
# isn't stuck at 100% after the initial entropy analysis (which has already finished).
if self.binwalk and total > 0:
self.binwalk.scan_length = total
self.binwalk.total_scanned = 0
# Analyze each scan pair and display the results
for (start, length) in scan_pairs:
# Ignore anything less than 4KB in size
if length > (self.DEFAULT_BLOCK_SIZE * 4):
# Ignore the first and last 1KB of data to prevent header/footer or extra data from skewing results
result = compression.CompressionEntropyAnalyzer(self.fd.name, start+self.DEFAULT_BLOCK_SIZE, length-self.DEFAULT_BLOCK_SIZE).analyze()
results = [{'description' : result[0]['description'], 'offset' : start}]
self.file_results.append((start, results))
if self.binwalk:
self.binwalk.display.results(start, results)
# Keep the total scanned length updated
if self.binwalk:
self.binwalk.total_scanned += length
def analyze(self, algorithm=None):
'''
Performs an entropy analysis of the data using the specified algorithm.
@algorithm - A method inside of the Entropy class to invoke for entropy analysis.
Default method: self.shannon.
Other available methods: self.gzip.
May also be a string: 'gzip'.
Returns the return value of algorithm.
'''
algo = self.shannon
if algorithm:
if callable(algorithm):
algo = algorithm
try:
if algorithm.lower() == 'gzip':
algo = self.gzip
except KeyboardInterrupt as e:
raise e
except Exception:
pass
return self._do_analysis(algo)
def plot(self, x, y, average=0, show_legend=True, save=False):
'''
Plots entropy data.
@x - List of graph x-coordinates (i.e., data offsets).
@y - List of graph y-coordinates (i.e., entropy for each offset).
@average - The average entropy.
@show_legend - Set to False to not generate a color-coded legend and plotted x coordinates for the graph.
@save - If set to True, graph will be saved to disk rather than displayed.
Returns None.
'''
PlotEntropy(x, y, self.fd.name, average, self.file_results, show_legend, save)
class Entropy(object):
'''
Class for analyzing and plotting data entropy for multiple files.
A simple example of performing a binwalk scan and overlaying the binwalk scan results on the
resulting entropy analysis graph:
import sys
import binwalk
bwalk = binwalk.Binwalk()
scan_results = bwalk.scan(sys.argv[1])
with binwalk.entropy.Entropy(scan_results, bwalk) as e:
e.analyze()
bwalk.cleanup()
'''
DESCRIPTION = "ENTROPY ANALYSIS"
ALT_DESCRIPTION = "HEURISTIC ANALYSIS"
ENTROPY_SCAN = 'entropy'
def __init__(self, files, binwalk=None, offset=0, length=0, block=0, plot=True, legend=True, save=False, algorithm=None, load_plugins=True, whitelist=[], blacklist=[], compcheck=False):
'''
Class constructor.
@files - A dictionary containing file names and results data, as returned by Binwalk.scan.
@binwalk - An instance of the Binwalk class.
@offset - The offset into the data to begin analysis.
@length - The number of bytes to analyze.
@block - The size of the data blocks to analyze.
@plot - Set to False to disable plotting.
@legend - Set to False to exclude the legend and custom offset markers from the plot.
@save - Set to True to save plots to disk instead of displaying them.
@algorithm - Set to 'gzip' to use the gzip entropy "algorithm".
@load_plugins - Set to False to disable plugin callbacks.
@whitelist - A list of whitelisted plugins.
@blacklist - A list of blacklisted plugins.
@compcheck - Set to True to enable entropy compression detection.
Returns None.
'''
self.files = files
self.binwalk = binwalk
self.offset = offset
self.length = length
self.block = block
self.plot = plot
self.legend = legend
self.save = save
self.algorithm = algorithm
self.plugins = None
self.load_plugins = load_plugins
self.whitelist = whitelist
self.blacklist = blacklist
self.compcheck = compcheck
if len(self.files) > 1:
self.save = True
if self.binwalk:
self.binwalk.scan_type = self.binwalk.ENTROPY
def __enter__(self):
return self
def __exit__(self, t, v, traceback):
return None
def __del__(self):
return None
def set_entropy_algorithm(self, algorithm):
'''
Specify a function/method to call for determining data entropy.
@algorithm - The function/method to call. This will be passed two arguments:
the file offset of the data block, and a data block (type 'str').
It must return a single floating point entropy value from 0.0 and 1.0, inclusive.
Returns None.
'''
self.algorithm = algorithm
def analyze(self):
'''
Perform an entropy analysis on the target files.
Returns a dictionary of:
{
'file_name' : ([list, of, offsets], [list, of, entropy], average_entropy)
}
'''
results = {}
if self.binwalk and self.load_plugins:
self.plugins = plugins.Plugins(self.binwalk, whitelist=self.whitelist, blacklist=self.blacklist)
for (file_name, overlay) in iterator(self.files):
if self.plugins:
self.plugins._load_plugins()
if self.binwalk:
if self.compcheck:
desc = self.ALT_DESCRIPTION
else:
desc = self.DESCRIPTION
self.binwalk.display.header(file_name=file_name, description=desc)
with FileEntropy(file_name=file_name, binwalk=self.binwalk, offset=self.offset, length=self.length, block=self.block, plugins=self.plugins, file_results=overlay, compcheck=self.compcheck) as e:
(x, y, average) = e.analyze(self.algorithm)
if self.plot or self.save:
e.plot(x, y, average, self.legend, self.save)
results[file_name] = (x, y, average)
if self.binwalk:
self.binwalk.display.footer()
if self.plugins:
del self.plugins
self.plugins = None
return results
import os
import re
import sys
import shlex
import tempfile
import subprocess
from binwalk.compat import *
from binwalk.config import *
from binwalk.common import file_size, unique_file_name, BlockFile
class Extractor:
'''
Extractor class, responsible for extracting files from the target file and executing external applications, if requested.
An instance of this class is accessible via the Binwalk.extractor object.
Example usage:
import binwalk
bw = binwalk.Binwalk()
# Create extraction rules for scan results containing the string 'gzip compressed data' and 'filesystem'.
# The former will be saved to disk with a file extension of 'gz' and the command 'gunzip <file name on disk>' will be executed (note the %e placeholder).
# The latter will be saved to disk with a file extension of 'fs' and no command will be executed.
# These rules will be ignored if there were previous rules with the same match string.
bw.extractor.add_rule(['gzip compressed data:gz:gunzip %e', 'filesystem:fs'])
# Load the extraction rules from the default extract.conf file(s).
bw.extractor.load_defaults()
# Run the binwalk scan.
bw.scan('firmware.bin')
'''
# Extract rules are delimited with a colon.
# <case insensitive matching string>:<file extension>[:<command to run>]
RULE_DELIM = ':'
# Comments in the extract.conf files start with a pound
COMMENT_DELIM ='#'
# Place holder for the extracted file name in the command
FILE_NAME_PLACEHOLDER = '%e'
# Max size of data to read/write at one time when extracting data
MAX_READ_SIZE = 10 * 1024 * 1024
def __init__(self, verbose=False, exec_commands=True, max_size=None):
'''
Class constructor.
@verbose - Set to True to display the output from any executed external applications.
@exec_commands - Set to False to disable the execution of external utilities when extracting data from files.
@max_size - Limit the size of extracted files to max_size.
Returns None.
'''
self.config = Config()
self.enabled = False
self.delayed = True
self.verbose = verbose
self.max_size = max_size
self.exec_commands = exec_commands
self.extract_rules = []
self.remove_after_execute = False
self.extract_path = os.getcwd()
def append_rule(self, r):
self.enabled = True
self.extract_rules.append(r.copy())
def add_rule(self, txtrule=None, regex=None, extension=None, cmd=None):
'''
Adds a set of rules to the extraction rule list.
@txtrule - Rule string, or list of rule strings, in the format <regular expression>:<file extension>[:<command to run>]
@regex - If rule string is not specified, this is the regular expression string to use.
@extension - If rule string is not specified, this is the file extension to use.
@cmd - If rule string is not specified, this is the command to run.
Alternatively a callable object may be specified, which will be passed one argument: the path to the file to extract.
Returns None.
'''
rules = []
match = False
r = {
'extension' : '',
'cmd' : '',
'regex' : None
}
# Process single explicitly specified rule
if not txtrule and regex and extension:
r['extension'] = extension
r['regex'] = re.compile(regex)
if cmd:
r['cmd'] = cmd
self.append_rule(r)
return
# Process rule string, or list of rule strings
if not isinstance(txtrule, type([])):
rules = [txtrule]
else:
rules = txtrule
for rule in rules:
r['cmd'] = ''
r['extension'] = ''
try:
values = self._parse_rule(rule)
match = values[0]
r['regex'] = re.compile(values[0])
r['extension'] = values[1]
r['cmd'] = values[2]
except KeyboardInterrupt as e:
raise e
except Exception:
pass
# Verify that the match string was retrieved.
if match:
self.append_rule(r)
def remove_rule(self, text):
'''
Remove all rules that match a specified text.
@text - The text to match against.
Returns the number of rules removed.
'''
rm = []
for i in range(0, len(self.extract_rules)):
if self.extract_rules[i]['regex'].match(text):
rm.append(i)
for i in rm:
self.extract_rules.pop(i)
return len(rm)
def clear_rules(self):
'''
Deletes all extraction rules.
Returns None.
'''
self.extract_rules = []
self.enabled = False
def get_rules(self):
'''
Returns a list of all extraction rules.
'''
return self.extract_rules
def enable_delayed_extract(self, tf=None):
'''
Enables / disables the delayed extraction feature.
This feature ensures that certian supported file types will not contain extra data at the end of the
file when they are extracted, but also means that these files will not be extracted until the end of the scan.
@tf - Set to True to enable, False to disable.
Returns the current delayed extraction setting.
'''
if tf is not None:
self.delayed = tf
return self.delayed
def load_from_file(self, fname):
'''
Loads extraction rules from the specified file.
@fname - Path to the extraction rule file.
Returns None.
'''
try:
# Process each line from the extract file, ignoring comments
with open(fname, 'r') as f:
for rule in f.readlines():
self.add_rule(rule.split(self.COMMENT_DELIM, 1)[0])
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception("Extractor.load_from_file failed to load file '%s': %s" % (fname, str(e)))
def load_defaults(self):
'''
Loads default extraction rules from the user and system extract.conf files.
Returns None.
'''
# Load the user extract file first to ensure its rules take precedence.
extract_files = [
self.config.paths['user'][self.config.EXTRACT_FILE],
self.config.paths['system'][self.config.EXTRACT_FILE],
]
for extract_file in extract_files:
try:
self.load_from_file(extract_file)
except KeyboardInterrupt as e:
raise e
except Exception as e:
if self.verbose:
raise Exception("Extractor.load_defaults failed to load file '%s': %s" % (extract_file, str(e)))
def output_directory(self, path):
'''
Set the output directory for extracted files.
@path - The extraction path.
Returns None.
'''
self.extract_path = path
def cleanup_extracted_files(self, tf=None):
'''
Set the action to take after a file is extracted.
@tf - If set to True, extracted files will be cleaned up after running a command against them.
If set to False, extracted files will not be cleaned up after running a command against them.
If set to None or not specified, the current setting will not be changed.
Returns the current cleanup status (True/False).
'''
if tf is not None:
self.remove_after_execute = tf
return self.remove_after_execute
def extract(self, offset, description, file_name, size, name=None):
'''
Extract an embedded file from the target file, if it matches an extract rule.
Called automatically by Binwalk.scan().
@offset - Offset inside the target file to begin the extraction.
@description - Description of the embedded file to extract, as returned by libmagic.
@file_name - Path to the target file.
@size - Number of bytes to extract.
@name - Name to save the file as.
Returns the name of the extracted file (blank string if nothing was extracted).
'''
fname = ''
cleanup_extracted_fname = True
original_dir = os.getcwd()
rules = self._match(description)
# No extraction rules for this file
if not rules:
return
if not os.path.exists(self.extract_path):
os.mkdir(self.extract_path)
file_path = os.path.realpath(file_name)
if os.path.isfile(file_path):
os.chdir(self.extract_path)
# Loop through each extraction rule until one succeeds
for i in range(0, len(rules)):
rule = rules[i]
# Copy out the data to disk, if we haven't already
fname = self._dd(file_path, offset, size, rule['extension'], output_file_name=name)
# If there was a command specified for this rule, try to execute it.
# If execution fails, the next rule will be attempted.
if rule['cmd']:
# Many extraction utilities will extract the file to a new file, just without
# the file extension (i.e., myfile.7z -> myfile). If the presumed resulting
# file name already exists before executing the extract command, do not attempt
# to clean it up even if its resulting file size is 0.
if self.remove_after_execute:
extracted_fname = os.path.splitext(fname)[0]
if os.path.exists(extracted_fname):
cleanup_extracted_fname = False
# Execute the specified command against the extracted file
extract_ok = self.execute(rule['cmd'], fname)
# Only clean up files if remove_after_execute was specified
if extract_ok and self.remove_after_execute:
# Remove the original file that we extracted
try:
os.unlink(fname)
except KeyboardInterrupt as e:
raise e
except Exception as e:
pass
# If the command worked, assume it removed the file extension from the extracted file
# If the extracted file name file exists and is empty, remove it
if cleanup_extracted_fname and os.path.exists(extracted_fname) and file_size(extracted_fname) == 0:
try:
os.unlink(extracted_fname)
except KeyboardInterrupt as e:
raise e
except Exception as e:
pass
# If the command executed OK, don't try any more rules
if extract_ok:
break
# Else, remove the extracted file if this isn't the last rule in the list.
# If it is the last rule, leave the file on disk for the user to examine.
elif i != (len(rules)-1):
try:
os.unlink(fname)
except KeyboardInterrupt as e:
raise e
except Exception as e:
pass
# If there was no command to execute, just use the first rule
else:
break
os.chdir(original_dir)
# If a file was extracted, return the full path to that file
if fname:
fname = os.path.join(self.extract_path, fname)
return fname
def delayed_extract(self, results, file_name, size):
'''
Performs a delayed extraction (see self.enable_delayed_extract).
Called internally by Binwalk.Scan().
@results - A list of dictionaries of all the scan results.
@file_name - The path to the scanned file.
@size - The size of the scanned file.
Returns an updated results list containing the names of the newly extracted files.
'''
index = 0
info_count = 0
nresults = results
for (offset, infos) in results:
info_count = 0
for info in infos:
ninfos = infos
if info['delay']:
end_offset = self._entry_offset(index, results, info['delay'])
if end_offset == -1:
extract_size = size
else:
extract_size = (end_offset - offset)
ninfos[info_count]['extract'] = self.extract(offset, info['description'], file_name, extract_size, info['name'])
nresults[index] = (offset, ninfos)
info_count += 1
index += 1
return nresults
def _entry_offset(self, index, entries, description):
'''
Gets the offset of the first entry that matches the description.
@index - Index into the entries list to begin searching.
@entries - Dictionary of result entries.
@description - Case insensitive description.
Returns the offset, if a matching description is found.
Returns -1 if a matching description is not found.
'''
description = description.lower()
for (offset, infos) in entries[index:]:
for info in infos:
if info['description'].lower().startswith(description):
return offset
return -1
def _match(self, description):
'''
Check to see if the provided description string matches an extract rule.
Called internally by self.extract().
@description - Description string to check.
Returns the associated rule dictionary if a match is found.
Returns None if no match is found.
'''
rules = []
description = description.lower()
for rule in self.extract_rules:
if rule['regex'].search(description):
rules.append(rule)
return rules
def _parse_rule(self, rule):
'''
Parses an extraction rule.
@rule - Rule string.
Returns an array of ['<case insensitive matching string>', '<file extension>', '<command to run>'].
'''
return rule.strip().split(self.RULE_DELIM, 2)
def _dd(self, file_name, offset, size, extension, output_file_name=None):
'''
Extracts a file embedded inside the target file.
@file_name - Path to the target file.
@offset - Offset inside the target file where the embedded file begins.
@size - Number of bytes to extract.
@extension - The file exension to assign to the extracted file on disk.
@output_file_name - The requested name of the output file.
Returns the extracted file name.
'''
total_size = 0
# Default extracted file name is <hex offset>.<extension>
default_bname = "%X" % offset
if self.max_size and size > self.max_size:
size = self.max_size
if not output_file_name or output_file_name is None:
bname = default_bname
else:
# Strip the output file name of invalid/dangerous characters (like file paths)
bname = os.path.basename(output_file_name)
fname = unique_file_name(bname, extension)
try:
# Open the target file and seek to the offset
fdin = BlockFile(file_name, 'r', length=size)
fdin.seek(offset)
# Open the output file
try:
fdout = BlockFile(fname, 'w')
except KeyboardInterrupt as e:
raise e
except Exception as e:
# Fall back to the default name if the requested name fails
fname = unique_file_name(default_bname, extension)
fdout = BlockFile(fname, 'w')
while total_size < size:
(data, dlen) = fdin.read_block()
fdout.write(str2bytes(data[:dlen]))
total_size += dlen
# Cleanup
fdout.close()
fdin.close()
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception("Extractor.dd failed to extract data from '%s' to '%s': %s" % (file_name, fname, str(e)))
return fname
def execute(self, cmd, fname):
'''
Execute a command against the specified file.
@cmd - Command to execute.
@fname - File to run command against.
Returns True on success, False on failure.
'''
tmp = None
retval = True
if not self.exec_commands:
return retval
try:
if callable(cmd):
try:
cmd(fname)
except KeyboardInterrupt as e:
raise e
except Exception as e:
sys.stderr.write("WARNING: Extractor.execute failed to run '%s': %s\n" % (str(cmd), str(e)))
else:
# If not in verbose mode, create a temporary file to redirect stdout and stderr to
if not self.verbose:
tmp = tempfile.TemporaryFile()
# Replace all instances of FILE_NAME_PLACEHOLDER in the command with fname
cmd = cmd.replace(self.FILE_NAME_PLACEHOLDER, fname)
# Execute.
if subprocess.call(shlex.split(cmd), stdout=tmp, stderr=tmp) != 0:
retval = False
except KeyboardInterrupt as e:
raise e
except Exception as e:
# Silently ignore no such file or directory errors. Why? Because these will inevitably be raised when
# making the switch to the new firmware mod kit directory structure. We handle this elsewhere, but it's
# annoying to see this spammed out to the console every time.
if e.errno != 2:
sys.stderr.write("WARNING: Extractor.execute failed to run '%s': %s\n" % (str(cmd), str(e)))
retval = False
if tmp is not None:
tmp.close()
return retval
from signature import Signature
from binvis import Plotter
from hexdiff import HexDiff
from hashmatch import HashMatch
from configuration import Configuration
from extractor import Extractor
from binwalk.modules.signature import Signature
from binwalk.modules.binvis import Plotter
from binwalk.modules.hexdiff import HexDiff
from binwalk.modules.hashmatch import HashMatch
from binwalk.modules.configuration import Configuration
from binwalk.modules.extractor import Extractor
import os
import binwalk.module
from binwalk.compat import *
from binwalk.common import BlockFile
from binwalk.core.compat import *
from binwalk.core.common import BlockFile
from binwalk.core.module import Module, Option, Kwarg
class Plotter(binwalk.module.Module):
class Plotter(Module):
'''
Base class for visualizing binaries in Qt.
Other plotter classes are derived from this.
......@@ -15,29 +15,29 @@ class Plotter(binwalk.module.Module):
TITLE = "Binary Visualization"
CLI = [
binwalk.module.ModuleOption(short='3',
long='3D',
kwargs={'axis' : 3, 'enabled' : True},
description='Generate a 3D binary visualization'),
binwalk.module.ModuleOption(short='2',
long='2D',
kwargs={'axis' : 2, 'enabled' : True},
description='Project data points onto 3D cube walls only'),
binwalk.module.ModuleOption(short='Z',
long='max-points',
type=int,
kwargs={'max_points' : 0},
description='Set the maximum number of plotted data points'),
binwalk.module.ModuleOption(short='V',
long='show-grids',
kwargs={'show_grids' : True},
description='Display the x-y-z grids in the resulting plot'),
Option(short='3',
long='3D',
kwargs={'axis' : 3, 'enabled' : True},
description='Generate a 3D binary visualization'),
Option(short='2',
long='2D',
kwargs={'axis' : 2, 'enabled' : True},
description='Project data points onto 3D cube walls only'),
Option(short='Z',
long='max-points',
type=int,
kwargs={'max_points' : 0},
description='Set the maximum number of plotted data points'),
Option(short='V',
long='show-grids',
kwargs={'show_grids' : True},
description='Display the x-y-z grids in the resulting plot'),
]
KWARGS = [
binwalk.module.ModuleKwarg(name='axis', default=3),
binwalk.module.ModuleKwarg(name='max_points', default=0),
binwalk.module.ModuleKwarg(name='show_grids', default=False),
Kwarg(name='axis', default=3),
Kwarg(name='max_points', default=0),
Kwarg(name='show_grids', default=False),
]
# There isn't really any useful data to print to console. Disable header and result output.
......
import os
import sys
import argparse
import binwalk.common
import binwalk.module
import binwalk.config
import binwalk.display
from binwalk.config import *
from binwalk.compat import *
import binwalk.core.common
import binwalk.core.config
import binwalk.core.display
from binwalk.core.config import *
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg, show_help
class Configuration(binwalk.module.Module):
class Configuration():
TITLE = "General"
DEPENDS = {}
CLI = [
binwalk.module.ModuleOption(long='length',
short='l',
type=int,
kwargs={'length' : 0},
description='Number of bytes to scan'),
binwalk.module.ModuleOption(long='offset',
short='o',
type=int,
kwargs={'offset' : 0},
description='Start scan at this file offset'),
binwalk.module.ModuleOption(long='block',
short='K',
type=int,
kwargs={'block' : 0},
description='Set file block size'),
binwalk.module.ModuleOption(long='swap',
short='g',
type=int,
kwargs={'swap_size' : 0},
description='Reverse every n bytes before scanning'),
binwalk.module.ModuleOption(long='log',
short='f',
type=argparse.FileType,
kwargs={'log_file' : None},
description='Log results to file'),
binwalk.module.ModuleOption(long='csv',
short='c',
kwargs={'csv' : True},
description='Log results to file in CSV format'),
binwalk.module.ModuleOption(long='term',
short='t',
kwargs={'format_to_terminal' : True},
description='Format output to fit the terminal window'),
binwalk.module.ModuleOption(long='quiet',
short='q',
kwargs={'quiet' : True},
description='Supress output to stdout'),
binwalk.module.ModuleOption(long='verbose',
short='v',
type=list,
kwargs={'verbose' : True},
description='Enable verbose output (specify twice for more verbosity)'),
binwalk.module.ModuleOption(short='h',
long='help',
kwargs={'show_help' : True},
description='Show help output'),
binwalk.module.ModuleOption(long=None,
short=None,
type=binwalk.common.BlockFile,
kwargs={'files' : []}),
Option(long='length',
short='l',
type=int,
kwargs={'length' : 0},
description='Number of bytes to scan'),
Option(long='offset',
short='o',
type=int,
kwargs={'offset' : 0},
description='Start scan at this file offset'),
Option(long='block',
short='K',
type=int,
kwargs={'block' : 0},
description='Set file block size'),
Option(long='swap',
short='g',
type=int,
kwargs={'swap_size' : 0},
description='Reverse every n bytes before scanning'),
Option(long='log',
short='f',
type=argparse.FileType,
kwargs={'log_file' : None},
description='Log results to file'),
Option(long='csv',
short='c',
kwargs={'csv' : True},
description='Log results to file in CSV format'),
Option(long='term',
short='t',
kwargs={'format_to_terminal' : True},
description='Format output to fit the terminal window'),
Option(long='quiet',
short='q',
kwargs={'quiet' : True},
description='Supress output to stdout'),
Option(long='verbose',
short='v',
type=list,
kwargs={'verbose' : True},
description='Enable verbose output (specify twice for more verbosity)'),
Option(short='h',
long='help',
kwargs={'show_help' : True},
description='Show help output'),
Option(long=None,
short=None,
type=binwalk.core.common.BlockFile,
kwargs={'files' : []}),
]
KWARGS = [
binwalk.module.ModuleKwarg(name='length', default=0),
binwalk.module.ModuleKwarg(name='offset', default=0),
binwalk.module.ModuleKwarg(name='block', default=0),
binwalk.module.ModuleKwarg(name='swap_size', default=0),
binwalk.module.ModuleKwarg(name='log_file', default=None),
binwalk.module.ModuleKwarg(name='csv', default=False),
binwalk.module.ModuleKwarg(name='format_to_terminal', default=False),
binwalk.module.ModuleKwarg(name='quiet', default=False),
binwalk.module.ModuleKwarg(name='verbose', default=[]),
binwalk.module.ModuleKwarg(name='files', default=[]),
binwalk.module.ModuleKwarg(name='show_help', default=False),
Kwarg(name='length', default=0),
Kwarg(name='offset', default=0),
Kwarg(name='block', default=0),
Kwarg(name='swap_size', default=0),
Kwarg(name='log_file', default=None),
Kwarg(name='csv', default=False),
Kwarg(name='format_to_terminal', default=False),
Kwarg(name='quiet', default=False),
Kwarg(name='verbose', default=[]),
Kwarg(name='files', default=[]),
Kwarg(name='show_help', default=False),
]
def load(self):
......@@ -87,15 +87,15 @@ class Configuration(binwalk.module.Module):
self._set_verbosity()
self._open_target_files()
self.settings = binwalk.config.Config()
self.display = binwalk.display.Display(log=self.log_file,
csv=self.csv,
quiet=self.quiet,
verbose=self.verbose,
fit_to_screen=self.format_to_terminal)
self.settings = binwalk.core.config.Config()
self.display = binwalk.core.display.Display(log=self.log_file,
csv=self.csv,
quiet=self.quiet,
verbose=self.verbose,
fit_to_screen=self.format_to_terminal)
if self.show_help:
binwalk.module.show_help()
show_help()
sys.exit(0)
def __del__(self):
......@@ -134,7 +134,7 @@ class Configuration(binwalk.module.Module):
if not os.path.isdir(tfile):
# Make sure we can open the target files
try:
fp = binwalk.common.BlockFile(tfile, length=self.length, offset=self.offset, swap=self.swap_size)
fp = binwalk.core.common.BlockFile(tfile, length=self.length, offset=self.offset, swap=self.swap_size)
self.target_files.append(fp)
except KeyboardInterrupt as e:
raise e
......
......@@ -4,9 +4,9 @@ import magic
import fnmatch
import ctypes
import ctypes.util
import binwalk.common
import binwalk.module
from binwalk.compat import *
import binwalk.core.common
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg
class HashResult(object):
'''
......@@ -19,7 +19,7 @@ class HashResult(object):
self.hash = hash
self.strings = strings
class HashMatch(binwalk.module.Module):
class HashMatch(Module):
'''
Class for fuzzy hash matching of files and directories.
'''
......@@ -29,50 +29,50 @@ class HashMatch(binwalk.module.Module):
TITLE = "Fuzzy Hash"
CLI = [
binwalk.module.ModuleOption(short='F',
long='fuzzy',
kwargs={'enabled' : True},
description='Perform fuzzy hash matching on files/directories'),
binwalk.module.ModuleOption(short='u',
long='cutoff',
priority=100,
type=int,
kwargs={'cutoff' : DEFAULT_CUTOFF},
description='Set the cutoff percentage'),
binwalk.module.ModuleOption(short='S',
long='strings',
kwargs={'strings' : True},
description='Diff strings inside files instead of the entire file'),
binwalk.module.ModuleOption(short='s',
long='same',
kwargs={'same' : True, 'cutoff' : CONSERVATIVE_CUTOFF},
description='Only show files that are the same'),
binwalk.module.ModuleOption(short='p',
long='diff',
kwargs={'same' : False, 'cutoff' : CONSERVATIVE_CUTOFF},
description='Only show files that are different'),
binwalk.module.ModuleOption(short='n',
long='name',
kwargs={'filter_by_name' : True},
description='Only compare files whose base names are the same'),
binwalk.module.ModuleOption(short='L',
long='symlinks',
kwargs={'symlinks' : True},
description="Don't ignore symlinks"),
Option(short='F',
long='fuzzy',
kwargs={'enabled' : True},
description='Perform fuzzy hash matching on files/directories'),
Option(short='u',
long='cutoff',
priority=100,
type=int,
kwargs={'cutoff' : DEFAULT_CUTOFF},
description='Set the cutoff percentage'),
Option(short='S',
long='strings',
kwargs={'strings' : True},
description='Diff strings inside files instead of the entire file'),
Option(short='s',
long='same',
kwargs={'same' : True, 'cutoff' : CONSERVATIVE_CUTOFF},
description='Only show files that are the same'),
Option(short='p',
long='diff',
kwargs={'same' : False, 'cutoff' : CONSERVATIVE_CUTOFF},
description='Only show files that are different'),
Option(short='n',
long='name',
kwargs={'filter_by_name' : True},
description='Only compare files whose base names are the same'),
Option(short='L',
long='symlinks',
kwargs={'symlinks' : True},
description="Don't ignore symlinks"),
]
KWARGS = [
binwalk.module.ModuleKwarg(name='cutoff', default=DEFAULT_CUTOFF),
binwalk.module.ModuleKwarg(name='strings', default=False),
binwalk.module.ModuleKwarg(name='same', default=True),
binwalk.module.ModuleKwarg(name='symlinks', default=False),
binwalk.module.ModuleKwarg(name='name', default=False),
binwalk.module.ModuleKwarg(name='max_results', default=None),
binwalk.module.ModuleKwarg(name='abspath', default=False),
binwalk.module.ModuleKwarg(name='matches', default={}),
binwalk.module.ModuleKwarg(name='types', default={}),
binwalk.module.ModuleKwarg(name='filter_by_name', default=False),
binwalk.module.ModuleKwarg(name='symlinks', default=False),
Kwarg(name='cutoff', default=DEFAULT_CUTOFF),
Kwarg(name='strings', default=False),
Kwarg(name='same', default=True),
Kwarg(name='symlinks', default=False),
Kwarg(name='name', default=False),
Kwarg(name='max_results', default=None),
Kwarg(name='abspath', default=False),
Kwarg(name='matches', default={}),
Kwarg(name='types', default={}),
Kwarg(name='filter_by_name', default=False),
Kwarg(name='symlinks', default=False),
]
# Requires libfuzzy.so
......@@ -89,21 +89,6 @@ class HashMatch(binwalk.module.Module):
RESULT = ["percentage", "description"]
def init(self):
'''
Class constructor.
@cutoff - The fuzzy cutoff which determines if files are different or not.
@strings - Only hash strings inside of the file, not the entire file itself.
@same - Set to True to show files that are the same, False to show files that are different.
@symlinks - Set to True to include symbolic link files.
@name - Set to True to only compare files whose base names match.
@max_results - Stop searching after x number of matches.
@abspath - Set to True to display absolute file paths.
@matches - A dictionary of file names to diff.
@types - A dictionary of file types to diff.
Returns None.
'''
self.total = 0
self.last_file1 = HashResult(None)
self.last_file2 = HashResult(None)
......@@ -118,7 +103,7 @@ class HashMatch(binwalk.module.Module):
self.types[k][i] = re.compile(self.types[k][i])
def _get_strings(self, fname):
return ''.join(list(binwalk.common.strings(fname, minimum=10)))
return ''.join(list(binwalk.core.common.strings(fname, minimum=10)))
def _show_result(self, match, fname):
if self.abspath:
......
......@@ -2,12 +2,12 @@ import os
import sys
import curses
import platform
import binwalk.module
import binwalk.common as common
from binwalk.compat import *
import binwalk.core.common as common
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg
# TODO: This code is an effing mess.
class HexDiff(binwalk.module.Module):
class HexDiff(Module):
ALL_SAME = 0
ALL_DIFF = 1
......@@ -25,33 +25,33 @@ class HexDiff(binwalk.module.Module):
TITLE = "Binary Diffing"
CLI = [
binwalk.module.ModuleOption(short='W',
long='hexdump',
kwargs={'enabled' : True},
description='Perform a hexdump / diff of a file or files'),
binwalk.module.ModuleOption(short='G',
long='green',
kwargs={'show_green' : True, 'show_blue' : False, 'show_red' : False},
description='Only show lines containing bytes that are the same among all files'),
binwalk.module.ModuleOption(short='i',
long='red',
kwargs={'show_red' : True, 'show_blue' : False, 'show_green' : False},
description='Only show lines containing bytes that are different among all files'),
binwalk.module.ModuleOption(short='U',
long='blue',
kwargs={'show_blue' : True, 'show_red' : False, 'show_green' : False},
description='Only show lines containing bytes that are different among some files'),
binwalk.module.ModuleOption(short='w',
long='terse',
kwargs={'terse' : True},
description='Diff all files, but only display a hex dump of the first file'),
Option(short='W',
long='hexdump',
kwargs={'enabled' : True},
description='Perform a hexdump / diff of a file or files'),
Option(short='G',
long='green',
kwargs={'show_green' : True, 'show_blue' : False, 'show_red' : False},
description='Only show lines containing bytes that are the same among all files'),
Option(short='i',
long='red',
kwargs={'show_red' : True, 'show_blue' : False, 'show_green' : False},
description='Only show lines containing bytes that are different among all files'),
Option(short='U',
long='blue',
kwargs={'show_blue' : True, 'show_red' : False, 'show_green' : False},
description='Only show lines containing bytes that are different among some files'),
Option(short='w',
long='terse',
kwargs={'terse' : True},
description='Diff all files, but only display a hex dump of the first file'),
]
KWARGS = [
binwalk.module.ModuleKwarg(name='show_red', default=True),
binwalk.module.ModuleKwarg(name='show_blue', default=True),
binwalk.module.ModuleKwarg(name='show_green', default=True),
binwalk.module.ModuleKwarg(name='terse', default=False),
Kwarg(name='show_red', default=True),
Kwarg(name='show_blue', default=True),
Kwarg(name='show_green', default=True),
Kwarg(name='terse', default=False),
]
HEADER_FORMAT = "\n%s\n"
......
import magic
import binwalk.module
import binwalk.parser
import binwalk.filter
import binwalk.smartsignature
from binwalk.compat import *
import binwalk.core.parser
import binwalk.core.filter
import binwalk.core.smartsignature
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg
class Signature(binwalk.module.Module):
class Signature(Module):
TITLE = "Signature Scan"
CLI = [
binwalk.module.ModuleOption(short='B',
long='signature',
kwargs={'enabled' : True},
description='Scan target file(s) for file signatures'),
binwalk.module.ModuleOption(short='m',
long='magic',
kwargs={'magic_files' : []},
type=list,
dtype='file',
description='Specify a custom magic file to use'),
binwalk.module.ModuleOption(short='R',
long='raw-bytes',
kwargs={'raw_bytes' : None},
type=str,
description='Specify a sequence of bytes to search for'),
binwalk.module.ModuleOption(short='b',
long='dumb',
kwargs={'dumb_scan' : True},
description='Disable smart signature keywords'),
binwalk.module.ModuleOption(short='I',
long='show-invalid',
kwargs={'show_invalid' : True},
description='Show results marked as invalid'),
binwalk.module.ModuleOption(short='x',
long='exclude',
kwargs={'exclude_filters' : []},
type=list,
dtype=str.__name__,
description='Exclude results that match <str>'),
binwalk.module.ModuleOption(short='y',
long='include',
kwargs={'include_filters' : []},
type=list,
dtype=str.__name__,
description='Only show results that match <str>'),
Option(short='B',
long='signature',
kwargs={'enabled' : True},
description='Scan target file(s) for file signatures'),
Option(short='m',
long='magic',
kwargs={'magic_files' : []},
type=list,
dtype='file',
description='Specify a custom magic file to use'),
Option(short='R',
long='raw-bytes',
kwargs={'raw_bytes' : None},
type=str,
description='Specify a sequence of bytes to search for'),
Option(short='b',
long='dumb',
kwargs={'dumb_scan' : True},
description='Disable smart signature keywords'),
Option(short='I',
long='show-invalid',
kwargs={'show_invalid' : True},
description='Show results marked as invalid'),
Option(short='x',
long='exclude',
kwargs={'exclude_filters' : []},
type=list,
dtype=str.__name__,
description='Exclude results that match <str>'),
Option(short='y',
long='include',
kwargs={'include_filters' : []},
type=list,
dtype=str.__name__,
description='Only show results that match <str>'),
]
KWARGS = [
binwalk.module.ModuleKwarg(name='enabled', default=False),
binwalk.module.ModuleKwarg(name='dumb_scan', default=False),
binwalk.module.ModuleKwarg(name='show_invalid', default=False),
binwalk.module.ModuleKwarg(name='raw_bytes', default=None),
binwalk.module.ModuleKwarg(name='magic_files', default=[]),
binwalk.module.ModuleKwarg(name='exclude_filters', default=[]),
binwalk.module.ModuleKwarg(name='include_filters', default=[]),
Kwarg(name='enabled', default=False),
Kwarg(name='dumb_scan', default=False),
Kwarg(name='show_invalid', default=False),
Kwarg(name='raw_bytes', default=None),
Kwarg(name='magic_files', default=[]),
Kwarg(name='exclude_filters', default=[]),
Kwarg(name='include_filters', default=[]),
]
HEADER = ["DECIMAL", "HEX", "DESCRIPTION"]
......@@ -67,9 +67,9 @@ class Signature(binwalk.module.Module):
def init(self):
# Create SmartSignature and MagicParser class instances. These are mostly for internal use.
self.filter = binwalk.filter.MagicFilter()
self.smart = binwalk.smartsignature.SmartSignature(self.filter, ignore_smart_signatures=self.dumb_scan)
self.parser = binwalk.parser.MagicParser(self.filter, self.smart)
self.filter = binwalk.core.filter.MagicFilter()
self.smart = binwalk.core.smartsignature.SmartSignature(self.filter, ignore_smart_signatures=self.dumb_scan)
self.parser = binwalk.core.parser.MagicParser(self.filter, self.smart)
# Set any specified include/exclude filters
for regex in self.exclude_filters:
......
import ctypes
import ctypes.util
from binwalk.common import *
from binwalk.core.common import *
class Plugin:
'''
......
from binwalk.plugin import *
class Plugin:
'''
Ensures that ASCII CPIO archive entries only get extracted once.
......
import os
import shutil
from binwalk.compat import *
from binwalk.common import BlockFile
from binwalk.core.compat import *
from binwalk.core.common import BlockFile
class Plugin:
'''
......@@ -15,22 +15,23 @@ class Plugin:
def __init__(self, module):
self.original_cmd = ''
self.enabled = (module.name == 'Signature')
self.module = module
#if module.extractor.enabled:
if self.enabled:
# Replace the existing LZMA extraction command with our own
# rules = self.binwalk.extractor.get_rules()
# for i in range(0, len(rules)):
# if rules[i]['regex'].match(self.SIGNATURE):
# self.original_cmd = rules[i]['cmd']
# rules[i]['cmd'] = self.lzma_cable_extractor
# break
rules = self.extractor.get_rules()
for i in range(0, len(rules)):
if rules[i]['regex'].match(self.SIGNATURE):
self.original_cmd = rules[i]['cmd']
rules[i]['cmd'] = self.lzma_cable_extractor
break
def lzma_cable_extractor(self, fname):
# Try extracting the LZMA file without modification first
if not self.binwalk.extractor.execute(self.original_cmd, fname):
if not self.module.extractor.execute(self.original_cmd, fname):
out_name = os.path.splitext(fname)[0] + '-patched' + os.path.splitext(fname)[1]
fp_out = BlockFile(out_name, 'w')
fp_in = BlockFile(fname)
fp_in = BlockFile(fname, swap=self.module.config.swap_size)
fp_in.MAX_TRAILING_SIZE = 0
i = 0
......@@ -51,11 +52,11 @@ class Plugin:
# Overwrite the original file so that it can be cleaned up if -r was specified
shutil.move(out_name, fname)
self.binwalk.extractor.execute(self.original_cmd, fname)
self.module.extractor.execute(self.original_cmd, fname)
def scan(self, result):
# The modified cable modem LZMA headers all have valid dictionary sizes and a properties byte of 0x5D.
if result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if self.enabled and result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if "properties: 0x5D" in result.description and "invalid dictionary size" not in result.description:
result.valid = True
result.description = result.description.split("invalid uncompressed size")[0] + "missing uncompressed size"
......
......@@ -7,11 +7,12 @@ class Plugin:
Searches for and validates zlib compressed data.
'''
MIN_DECOMP_SIZE = 16*1024
MIN_DECOMP_SIZE = 16 * 1024
MAX_DATA_SIZE = 33 * 1024
def __init__(self, module):
self.tinfl = None
self.module = module
# Only initialize this plugin if this is a signature scan
if module.name == 'Signature':
......@@ -22,8 +23,7 @@ class Plugin:
# If this result is a zlib signature match, try to decompress the data
if self.tinfl and result.file and result.description.lower().startswith('zlib'):
# Seek to and read the suspected zlib data
fd = BlockFile(result.file.name, "r")
fd.seek(result.offset)
fd = BlockFile(result.file.name, offset=result.offset, swap=self.module.config.swap_size)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
......
import binwalk.entropy as entropy
import binwalk.plugins as plugins
import binwalk.common as common
from binwalk.compat import *
class FileStrings(object):
'''
Class for performing a "smart" strings analysis on a single file.
It is preferred to use the Strings class instead of this class directly.
'''
SUSPECT_STRING_LENGTH = 4
SUSPECT_SPECIAL_CHARS_RATIO = .25
MIN_STRING_LENGTH = 3
MAX_STRING_LENGTH = 20
MAX_SPECIAL_CHARS_RATIO = .4
MAX_ENTROPY = 0.9
DEFAULT_ENTROPY_BLOCK = 1024
LETTERS = set(string.letters)
NUMBERS = set(string.digits)
PRINTABLE = set(string.printable)
WHITESPACE = set(string.whitespace)
PUNCTUATION = set(string.punctuation)
NEWLINES = set(['\r', '\n', '\x0b', '\x0c'])
VOWELS = set(['A', 'E', 'I', 'O', 'U', 'a', 'e', 'i', 'o', 'u'])
NON_ALPHA_EXCEPTIONS = set(['%', '.', '/', '-', '_'])
BRACKETED = {
'[' : ']',
'<' : '>',
'{' : '}',
'(' : ')',
}
def __init__(self, file_name, binwalk=None, length=0, offset=0, n=MIN_STRING_LENGTH, block=DEFAULT_ENTROPY_BLOCK, algorithm='gzip', plugins=None):
'''
Class constructor. Preferred to be invoked from the Strings class instead of directly.
@file_name - The file name to perform a strings analysis on.
@binwalk - An instance of the Binwalk class.
@length - The number of bytes in the file to analyze.
@offset - The starting offset into the file to begin analysis.
@n - The minimum valid string length.
@block - The block size to use iwhen performing entropy analysis. Set to None to skip entropy analysis.
@algorithm - The entropy algorithm to use when performing entropy analysis.
@plugins - An instance of the Plugins class.
Returns None.
'''
self.n = n
self.binwalk = binwalk
self.length = length
self.start = offset
self.data = ''
self.dlen = 0
self.i = 0
self.total_read = 0
self.entropy = {}
self.valid_strings = []
self.external_validators = []
self.plugins = plugins
self.block = block
if not self.n:
self.n = self.MIN_STRING_LENGTH
if self.block is not None:
# Perform an entropy analysis over the entire file (anything less may generate poor entropy data).
# Give fake file results list to prevent FileEntropy from doing too much analysis.
with entropy.FileEntropy(file_name, block=self.block, file_results=['foo']) as e:
(self.x, self.y, self.average_entropy) = e.analyze(algorithm=algorithm)
for i in range(0, len(self.x)):
self.entropy[self.x[i]] = self.y[i]
# Make sure our block size matches the entropy analysis's block size
self.block = e.block
# Make sure the starting offset is a multiple of the block size; else, when later checking
# the entropy analysis, block offsets won't line up.
self.start -= (self.start % self.block)
else:
i = 0
self.block = common.BlockFile.READ_BLOCK_SIZE
# Fake the entropy scan
while i < common.file_size(file_name):
self.entropy[i] = 1.0
i += self.block
self.fd = common.BlockFile(file_name, 'r', length=length, offset=self.start)
# TODO: This is not optimal. We should read in larger chunks and process it into self.block chunks.
self.fd.READ_BLOCK_SIZE = self.block
self.fd.MAX_TRAILING_SIZE = 0
self.start = self.fd.offset
# Set the total_scanned and scan_length values for plugins and status display messages
if self.binwalk:
self.binwalk.total_scanned = 0
self.binwalk.scan_length = self.fd.length
def __enter__(self):
return self
def __del__(self):
self.cleanup()
def __exit__(self, t, v, traceback):
self.cleanup()
def cleanup(self):
try:
self.fd.close()
except:
pass
def _read_block(self):
'''
Read one block of data from the target file.
Returns a tuple of (offset, data_length, data).
'''
offset = self.total_read + self.start
# Ignore blocks which have a higher than average or higher than MAX_ENTROPY entropy
while has_key(self.entropy, offset):
# Don't ignore blocks that border on an entropy rising/falling edge
try:
if self.entropy[offset-self.block] <= self.MAX_ENTROPY:
break
if self.entropy[offset+self.block] <= self.MAX_ENTROPY:
break
except KeyError:
break
if self.entropy[offset] > self.average_entropy or self.entropy[offset] > self.MAX_ENTROPY:
self.total_read += self.block
offset = self.total_read + self.start
self.fd.seek(offset)
else:
break
(data, dlen) = self.fd.read_block()
if self.binwalk:
self.binwalk.total_scanned = self.total_read
self.total_read += dlen
return (self.start+self.total_read-dlen, dlen, data)
def _next_byte(self):
'''
Grab the next byte from the file.
Returns a tuple of (offset, byte).
'''
byte = ''
# If we've reached the end of the data buffer that we previously read in, read in the next block of data
if self.i == self.dlen:
(self.current_offset, self.dlen, self.data) = self._read_block()
self.i = 0
if self.i < self.dlen:
byte = self.data[self.i]
self.i += 1
return (self.current_offset+self.i-1, byte)
def _has_vowels(self, data):
'''
Returns True if data has a vowel in it, otherwise returns False.
'''
for vowel in self.VOWELS:
if vowel in data:
return True
return False
def _alpha_count(self, data):
'''
Returns the number of english letters in data.
'''
c = 0
for char in data:
if char in self.LETTERS:
c += 1
return c
def _is_bracketed(self, data):
'''
Checks if a string is bracketed by special characters.
@data - The data string to check.
Returns True if bracketed, False if not.
'''
return has_key(self.BRACKETED, data[0]) and data.endswith(self.BRACKETED[data[0]])
def _non_alpha_count(self, data):
'''
Returns the number of non-english letters in data.
'''
count = 0
dlen = len(data)
# No exceptions for very short strings
if dlen <= self.SUSPECT_STRING_LENGTH:
exceptions = []
else:
exceptions = set(self.NON_ALPHA_EXCEPTIONS)
non_alphanumeric = self.LETTERS | self.NUMBERS
for char in data:
if char not in non_alphanumeric and char not in exceptions:
count += 1
return count
def _too_many_special_chars(self, data):
'''
Returns True if the ratio of special characters in data is too high, otherwise returns False.
'''
# If an open bracket exists, we expect a close bracket as well
for (key, value) in iterator(self.BRACKETED):
if key in data and not value in data:
return True
# For better filtering of false positives, require a lower ratio of special characters for very short strings
if len(data) <= self.SUSPECT_STRING_LENGTH:
return (float(self._non_alpha_count(data)) / len(data)) >= self.SUSPECT_SPECIAL_CHARS_RATIO
return (float(self._non_alpha_count(data)) / len(data)) >= self.MAX_SPECIAL_CHARS_RATIO
def _fails_grammar_rules(self, data):
'''
Returns True if data fails one of several general grammatical/logical rules.
'''
# Nothing here is going to be perfect and will likely result in both false positives and false negatives.
# The goal however is not to be perfect, but to filter out as many garbage strings while generating as
# few false negatives as possible.
# Generally, the first byte of a string is not a punctuation mark
if data[0] in self.PUNCTUATION:
return True
# Some punctuation may be generally considered OK if found at the end of a string; others are very unlikely
if data[-1] in self.PUNCTUATION and data[-1] not in ['.', '?', ',', '!', '>', '<', '|', '&']:
return True
for i in range(0, len(data)):
try:
# Q's must be followed by U's
if data[i] in ['q', 'Q'] and data[i+1] not in ['u', 'U']:
return True
except:
pass
try:
# Three characters in a row are the same? Unlikely.
if data[i] == data[i+1] == data[i+2]:
return True
except:
pass
try:
# Three punctuation marks in a row? Unlikely.
if data[i] in self.PUNCTUATION and data[i+1] in self.PUNCTUATION and data[i+2] in self.PUNCTUATION:
return True
except:
pass
return False
def _is_valid(self, offset, string):
'''
Determines of a particular string is "valid" or not.
@string - The string in question.
Returns True if the string is valid, False if invalid.
'''
strlen = len(string)
for callback in self.external_validators:
r = callback(offset, string)
if r is not None:
return r
# Large strings are automatically considered valid/interesting
if strlen >= self.MAX_STRING_LENGTH:
return True
elif strlen >= self.n:
# The chances of a random string being bracketed is pretty low.
# If the string is bracketed, consider it valid.
if self._is_bracketed(string):
return True
# Else, do some basic sanity checks on the string
elif self._has_vowels(string):
if not self._too_many_special_chars(string):
if not self._fails_grammar_rules(string):
return True
return False
def _add_string(self, offset, string, plug_pre):
'''
Adds a string to the list of valid strings if it passes several rules.
Also responsible for calling plugin and display callback functions.
@offset - The offset at which the string was found.
@string - The string that was found.
@plug_pre - Return value from plugin pre-scan callback functions.
Returns the value from the plugin callback functions.
'''
plug_ret = plugins.PLUGIN_CONTINUE
string = string.strip()
if self._is_valid(offset, string):
results = {'description' : string, 'offset' : offset}
if self.plugins:
plug_ret = self.plugins._scan_callbacks(results)
offset = results['offset']
string = results['description']
if not ((plug_ret | plug_pre ) & plugins.PLUGIN_NO_DISPLAY):
if self.binwalk:
self.binwalk.display.results(offset, [results])
self.valid_strings.append((offset, string))
return plug_ret
def strings(self):
'''
Perform a strings analysis on the target file.
Returns a list of tuples consiting of [(offset, string), (offset, string), ...].
'''
string = ''
string_start = 0
plugin_pre = plugins.PLUGIN_CONTINUE
plugin_ret = plugins.PLUGIN_CONTINUE
if self.plugins:
plugin_pre = self.plugins._pre_scan_callbacks(self.fd)
while not ((plugin_pre | plugin_ret) & plugins.PLUGIN_TERMINATE):
(byte_offset, byte) = self._next_byte()
# If the returned byte is NULL, try to add whatever string we have now and quit
if not byte:
self._add_string(string_start, string, plugin_pre)
break
# End of string is signified by a non-printable character or a new line
if byte in self.PRINTABLE and byte not in self.NEWLINES:
if not string:
string_start = byte_offset
string += byte
else:
plugin_ret = self._add_string(string_start, string, plugin_pre)
string = ''
if self.plugins:
self.plugins._post_scan_callbacks(self.fd)
return self.valid_strings
class Strings(object):
'''
Class for performing a strings analysis against a list of files.
'''
def __init__(self, file_names, binwalk=None, length=0, offset=0, n=0, block=0, algorithm=None, load_plugins=True, whitelist=[], blacklist=[]):
'''
Class constructor.
@file_names - A list of files to analyze.
@binwalk - An instance of the Binwalk class.
@length - The number of bytes in the file to analyze.
@offset - The starting offset into the file to begin analysis.
@n - The minimum valid string length.
@block - The block size to use when performing entropy analysis.
@algorithm - The entropy algorithm to use when performing entropy analysis.
@load_plugins - Set to False to disable plugin callbacks.
@whitelist - A list of whitelisted plugins.
@blacklist - A list of blacklisted plugins.
Returns None.
'''
self.file_names = file_names
self.binwalk = binwalk
self.length = length
self.offset = offset
self.n = n
self.block = block
self.algorithm = algorithm
self.file_strings = None
self.plugins = None
if self.binwalk:
self.binwalk.scan_type = self.binwalk.STRINGS
if load_plugins:
self.plugins = plugins.Plugins(self.binwalk, whitelist=whitelist, blacklist=blacklist)
def __enter__(self):
return self
def __exit__(self, t, v, traceback):
return None
def add_validator(self, callback):
'''
Add a validation function to be invoked when determining if a string is valid or not.
Validators are passed two arguments: the string offset and the string in question.
Validators may return:
o True - The string is valid, stop further analysis.
o False - The string is not valid, stop futher analysis.
o None - Unknown, continue analysis.
@callback - The validation function.
Returns None.
'''
if self.file_strings:
self.file_strings.external_validators.append(callback)
def strings(self):
'''
Perform a "smart" strings analysis against the target files.
Returns a dictionary compatible with other classes (Entropy, Binwalk, etc):
{
'file_name' : (offset, [{
'description' : 'Strings',
'string' : 'found_string'
}]
)
}
'''
results = {}
if self.plugins:
self.plugins._load_plugins()
for file_name in self.file_names:
if self.binwalk:
self.binwalk.display.header(file_name=file_name, description='Strings')
results[file_name] = []
self.file_strings = FileStrings(file_name, self.binwalk, self.length, self.offset, self.n, block=self.block, algorithm=self.algorithm, plugins=self.plugins)
for (offset, string) in self.file_strings.strings():
results[file_name].append((offset, [{'description' : 'Strings', 'string' : string}]))
del self.file_strings
self.file_strings = None
self.binwalk.display.footer()
if self.plugins:
del self.plugins
return results
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment