Commit 1a117715 by devttys0

Initial move of hexdiff.py to modules.

parent aba30b02
......@@ -47,34 +47,71 @@ class ModuleKwarg(object):
def process_kwargs(module, kwargs):
return Modules(dummy=True).kwargs(module, kwargs)
def show_help():
print Modules(dummy=True).help()
class Modules(object):
def __init__(self, dummy=False):
self.config = None
self.dependency_results = {}
if not dummy:
from binwalk.modules.configuration import Configuration
self.config = self.load(Configuration)
def list(self):
def list(self, attribute="run"):
import binwalk.modules
objects = []
for (name, obj) in inspect.getmembers(binwalk.modules):
if inspect.isclass(obj) and hasattr(obj, "run"):
if inspect.isclass(obj) and hasattr(obj, attribute):
objects.append(obj)
return objects
def help(self):
help_string = ""
for obj in self.list(attribute="CLI"):
help_string += "\n%s Options:\n" % obj.NAME
for module_option in obj.CLI:
if module_option.long:
long_opt = '--' + module_option.long
if module_option.nargs > 0:
optargs = "=%s" % str(module_option.type)
else:
optargs = ""
if module_option.short:
short_opt = "-" + module_option.short + ","
else:
short_opt = " "
fmt = " %%s %%s%%-%ds%%s\n" % (32-len(long_opt))
help_string += fmt % (short_opt, long_opt, optargs, module_option.description)
return help_string
def execute(self):
results = {}
for module in self.list():
result = self.run(module)
if result is not None:
results[module] = result
return results
def run(self, module):
results = None
obj = self.load()
obj = self.load(module)
if obj.enabled:
try:
obj.run()
except AttributeError:
pass
results = obj.run()
except AttributeError as e:
print("WARNING:", e)
return results
......@@ -93,8 +130,10 @@ class Modules(object):
self.config.display.log = False
self.config.display.quiet = True
for (kwarg, mod) in module.DEPENDS:
kwargs[kwarg] = self.run(mod)
for (kwarg, mod) in iterator(module.DEPENDS):
if not has_key(self.dependency_results, mod):
self.dependency_results[mod] = self.run(mod)
kwargs[kwarg] = self.dependency_results[mod]
self.config.display.log = orig_log
self.config.display.quiet = orig_quiet
......@@ -158,9 +197,9 @@ class Modules(object):
# Do this manually as argparse doesn't seem to be able to handle hexadecimal values
if module_option.type == int:
kwargs[name] = int(kwargs[name], 0)
kwargs[name] = int(value, 0)
elif module_option.type == float:
kwargs[name] = float(kwargs[name])
kwargs[name] = float(value)
elif module_option.type == dict:
if not has_key(kwargs, name):
kwargs[name] = {}
......@@ -177,13 +216,6 @@ class Modules(object):
if self.config is not None and not has_key(kwargs, 'config'):
kwargs['config'] = self.config
# If some command line arguments for this module were parsed, then set it to enabled.
# Else, disable it by default.
if kwargs:
kwargs['enabled'] = True
else:
kwargs['enabled'] = False
return kwargs
def kwargs(self, module, kwargs):
......@@ -204,8 +236,12 @@ class Modules(object):
setattr(module, module_argument.name, arg_value)
if has_key(kwargs, 'config'):
setattr(module, 'config', kwargs['config'])
for (k, v) in iterator(kwargs):
if not hasattr(module, k):
setattr(module, k, v)
if not hasattr(module, 'enabled'):
setattr(module, 'enabled', False)
else:
raise Exception("binwalk.module.process_kwargs: %s has no attribute 'KWARGS'" % str(module))
from configuration import Configuration
from hashmatch import HashFile, HashFiles, HashDirectories
from hashmatch import HashMatch
from binvis import Plotter
from hexdiff import HexDiff
......@@ -3,9 +3,12 @@ import sys
import binwalk.common
import binwalk.module
import binwalk.display
from binwalk.config import *
from binwalk.compat import *
class Configuration(object):
NAME = "General"
CLI = [
binwalk.module.ModuleOption(long='length',
short='l',
......@@ -19,6 +22,12 @@ class Configuration(object):
type=int,
kwargs={'offset' : 0},
description='Start scan at this file offset'),
binwalk.module.ModuleOption(long='block',
short='K',
nargs=1,
type=int,
kwargs={'block' : 0},
description='Set file block size'),
binwalk.module.ModuleOption(long='grep',
short='g',
nargs=1,
......@@ -51,15 +60,24 @@ class Configuration(object):
type=list,
kwargs={'verbose' : True},
description='Enable verbose output (specify twice for more verbosity)'),
binwalk.module.ModuleOption(short='h',
long='help',
kwargs={'show_help' : True},
description='Show help output'),
binwalk.module.ModuleOption(long=None,
short=None,
type=binwalk.common.BlockFile,
kwargs={'target_files' : []}),
binwalk.module.ModuleOption(short='u',
long='update',
kwargs={'do_update' : True},
description='Update magic signature files'),
]
KWARGS = [
binwalk.module.ModuleKwarg(name='length', default=None),
binwalk.module.ModuleKwarg(name='offset', default=None),
binwalk.module.ModuleKwarg(name='length', default=0),
binwalk.module.ModuleKwarg(name='offset', default=0),
binwalk.module.ModuleKwarg(name='block', default=0),
binwalk.module.ModuleKwarg(name='log_file', default=None),
binwalk.module.ModuleKwarg(name='csv', default=False),
binwalk.module.ModuleKwarg(name='format_to_terminal', default=False),
......@@ -69,11 +87,21 @@ class Configuration(object):
binwalk.module.ModuleKwarg(name='debug_verbose', default=False),
binwalk.module.ModuleKwarg(name='skip_unopened', default=False),
binwalk.module.ModuleKwarg(name='target_files', default=[]),
binwalk.module.ModuleKwarg(name='show_help', default=False),
binwalk.module.ModuleKwarg(name='do_update', default=False),
]
def __init__(self, **kwargs):
binwalk.module.process_kwargs(self, kwargs)
if self.show_help:
binwalk.module.show_help()
sys.exit(0)
if self.do_update:
Update(self.verbose).update()
sys.exit(0)
self._test_target_files()
self._set_verbosity()
......@@ -123,3 +151,118 @@ class Configuration(object):
plural = ''
raise Exception("Failed to open %d file%s for scanning" % (failed_open_count, plural))
class Update(object):
'''
Class for updating binwalk configuration and signatures files from the subversion trunk.
Example usage:
from binwalk import Update
Update().update()
'''
BASE_URL = "https://raw.github.com/devttys0/binwalk/master/src/binwalk/"
MAGIC_PREFIX = "magic/"
CONFIG_PREFIX = "config/"
def __init__(self, verbose):
'''
Class constructor.
@verbose - Verbose flag.
Returns None.
'''
self.verbose = verbose
self.config = Config()
def update(self):
'''
Updates all system wide signatures and config files.
Returns None.
'''
self.update_binwalk()
self.update_bincast()
self.update_binarch()
self.update_extract()
self.update_zlib()
self.update_compressd()
def _do_update_from_svn(self, prefix, fname):
'''
Updates the specified file to the latest version of that file in SVN.
@prefix - The URL subdirectory where the file is located.
@fname - The name of the file to update.
Returns None.
'''
# Get the local http proxy, if any
# csoban.kesmarki
proxy_url = os.getenv('HTTP_PROXY')
if proxy_url:
proxy_support = urllib2.ProxyHandler({'http' : proxy_url})
opener = urllib2.build_opener(proxy_support)
urllib2.install_opener(opener)
url = self.BASE_URL + prefix + fname
try:
if self.verbose:
print("Fetching %s..." % url)
data = urllib2.urlopen(url).read()
open(self.config.paths['system'][fname], "wb").write(data)
except Exception as e:
raise Exception("Update._do_update_from_svn failed to update file '%s': %s" % (url, str(e)))
def update_binwalk(self):
'''
Updates the binwalk signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.BINWALK_MAGIC_FILE)
def update_bincast(self):
'''
Updates the bincast signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.BINCAST_MAGIC_FILE)
def update_binarch(self):
'''
Updates the binarch signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.BINARCH_MAGIC_FILE)
def update_zlib(self):
'''
Updates the zlib signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.ZLIB_MAGIC_FILE)
def update_compressd(self):
'''
Updates the compress'd signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.COMPRESSD_MAGIC_FILE)
def update_extract(self):
'''
Updates the extract.conf file.
Returns None.
'''
self._do_update_from_svn(self.CONFIG_PREFIX, self.config.EXTRACT_FILE)
......@@ -27,7 +27,12 @@ class HashMatch(object):
DEFAULT_CUTOFF = 0
CONSERVATIVE_CUTOFF = 90
NAME = "Fuzzy Hash"
CLI = [
binwalk.module.ModuleOption(short='F',
long='fuzzy',
kwargs={'enabled' : True},
description='Perform fuzzy hash matching on files/directories'),
binwalk.module.ModuleOption(short='c',
long='cutoff',
nargs=1,
......@@ -43,7 +48,7 @@ class HashMatch(object):
long='same',
kwargs={'same' : True, 'cutoff' : CONSERVATIVE_CUTOFF},
description='Only show files that are the same'),
binwalk.module.ModuleOption(short='d',
binwalk.module.ModuleOption(short='',
long='diff',
kwargs={'same' : False, 'cutoff' : CONSERVATIVE_CUTOFF},
description='Only show files that are different'),
......@@ -91,9 +96,6 @@ class HashMatch(object):
'''
binwalk.module.process_kwargs(self, kwargs)
self.config.display.format_strings(self.HEADER_FORMAT, self.RESULT_FORMAT)
self.config.display.header(*self.HEADER)
self.total = 0
self.last_file1 = HashResult(None)
self.last_file2 = HashResult(None)
......@@ -262,17 +264,12 @@ class HashMatch(object):
return set(file_list)
class HashFiles(HashMatch):
def run(self):
def hash_files(self, needle, haystack):
'''
Compare one file against a list of other files.
Returns a list of tuple results.
'''
needle = self.config.target_files[0]
haystack = self.config.target_files[1:]
results = []
self.total = 0
......@@ -286,20 +283,14 @@ class HashFiles(HashMatch):
if self.max_results and self.total >= self.max_results:
break
self._print_footer()
return results
class HashFile(HashMatch):
def run(self):
def hash_file(self, needle, haystack):
'''
Search for one file inside one or more directories.
Returns a list of tuple results.
'''
needle = self.config.target_files[0]
haystack = self.config.target_files[1:]
matching_files = []
self.total = 0
done = False
......@@ -319,20 +310,14 @@ class HashFile(HashMatch):
if done:
break
self._print_footer()
return matching_files
class HashDirectories(HashMatch):
def run(self):
def hash_directories(self, needle, haystack):
'''
Compare the contents of one directory with the contents of other directories.
Returns a list of tuple results.
'''
needle = self.config.target_files[0]
haystack = self.config.target_files[1:]
done = False
results = []
self.total = 0
......@@ -359,6 +344,28 @@ class HashDirectories(HashMatch):
if done:
break
self._print_footer()
return results
def run(self):
'''
Main module method.
'''
results = None
needle = self.config.target_files[0]
haystack = self.config.target_files[1:]
self.config.display.format_strings(self.HEADER_FORMAT, self.RESULT_FORMAT)
self.config.display.header(*self.HEADER)
if os.path.isfile(needle):
if os.path.isfile(haystack[0]):
results = self.hash_files(needle, haystack)
else:
results = self.hash_file(needle, haystack)
else:
results = self.hash_directories(needle, haystack)
self.config.display.footer()
return results
import os
from binwalk.compat import *
from binwalk.common import BlockFile
class Plotter(object):
'''
Base class for plotting binaries in Qt.
Other plotter classes are derived from this.
'''
DIMENSIONS = 3
VIEW_DISTANCE = 1024
MAX_PLOT_POINTS = 25000
def __init__(self, files, offset=0, length=0, max_points=MAX_PLOT_POINTS, show_grids=False, verbose=False):
'''
Class constructor.
@files - A list of files to plot in the graph.
@offset - The starting offset for each file.
@length - The number of bytes to analyze from each file.
@max_points - The maximum number of data points to display.
@show_grids - Set to True to display x-y-z grids.
@verbse - Set to False to disable verbose print statements.
Returns None.
'''
import pyqtgraph.opengl as gl
from pyqtgraph.Qt import QtGui
self.verbose = verbose
self.show_grids = show_grids
self.files = files
self.offset = offset
self.length = length
if not max_points:
self.max_points = self.MAX_PLOT_POINTS
else:
self.max_points = max_points
self.app = QtGui.QApplication([])
self.window = gl.GLViewWidget()
self.window.opts['distance'] = self.VIEW_DISTANCE
if len(self.files) == 1:
self.window.setWindowTitle(self.files[0])
def _print(self, message):
'''
Print console messages. For internal use only.
'''
if self.verbose:
print (message)
def _generate_plot_points(self, data_points):
'''
Generates plot points from a list of data points.
@data_points - A dictionary containing each unique point and its frequency of occurance.
Returns a set of plot points.
'''
total = 0
min_weight = 0
weightings = {}
plot_points = {}
# If the number of data points exceeds the maximum number of allowed data points, use a
# weighting system to eliminate data points that occur less freqently.
if sum(data_points.itervalues()) > self.max_points:
# First, generate a set of weight values 1 - 10
for i in range(1, 11):
weightings[i] = 0
# Go through every data point and how many times that point occurs
for (point, count) in iterator(data_points):
# For each data point, compare it to each remaining weight value
for w in get_keys(weightings):
# If the number of times this data point occurred is >= the weight value,
# then increment the weight value. Since weight values are ordered lowest
# to highest, this means that more frequent data points also increment lower
# weight values. Thus, the more high-frequency data points there are, the
# more lower-frequency data points are eliminated.
if count >= w:
weightings[w] += 1
else:
break
# Throw out weight values that exceed the maximum number of data points
if weightings[w] > self.max_points:
del weightings[w]
# If there's only one weight value left, no sense in continuing the loop...
if len(weightings) == 1:
break
# The least weighted value is our minimum weight
min_weight = min(weightings)
# Get rid of all data points that occur less frequently than our minimum weight
for point in get_keys(data_points):
if data_points[point] < min_weight:
del data_points[point]
for point in sorted(data_points, key=data_points.get, reverse=True):
plot_points[point] = data_points[point]
total += 1
if total >= self.max_points:
break
return plot_points
def _generate_data_point(self, data):
'''
Subclasses must override this to return the appropriate data point.
@data - A string of data self.DIMENSIONS in length.
Returns a data point tuple.
'''
return (0,0,0)
def _generate_data_points(self, file_name):
'''
Generates a dictionary of data points and their frequency of occurrance.
@file_name - The file to generate data points from.
Returns a dictionary.
'''
i = 0
data_points = {}
self._print("Generating data points for %s" % file_name)
with BlockFile(file_name, 'r', offset=self.offset, length=self.length) as fp:
fp.MAX_TRAILING_SIZE = 0
while True:
(data, dlen) = fp.read_block()
if not data or not dlen:
break
i = 0
while (i+(self.DIMENSIONS-1)) < dlen:
point = self._generate_data_point(data[i:i+self.DIMENSIONS])
if has_key(data_points, point):
data_points[point] += 1
else:
data_points[point] = 1
i += 3
return data_points
def _generate_plot(self, plot_points):
import numpy as np
import pyqtgraph.opengl as gl
nitems = float(len(plot_points))
pos = np.empty((nitems, 3))
size = np.empty((nitems))
color = np.empty((nitems, 4))
i = 0
for (point, weight) in iterator(plot_points):
r = 0.0
g = 0.0
b = 0.0
pos[i] = point
frequency_percentage = (weight / nitems)
# Give points that occur more frequently a brighter color and larger point size.
# Frequency is determined as a percentage of total unique data points.
if frequency_percentage > .005:
size[i] = .20
r = 1.0
elif frequency_percentage > .002:
size[i] = .10
g = 1.0
r = 1.0
else:
size[i] = .05
g = 1.0
color[i] = (r, g, b, 1.0)
i += 1
scatter_plot = gl.GLScatterPlotItem(pos=pos, size=size, color=color, pxMode=False)
scatter_plot.translate(-127.5, -127.5, -127.5)
return scatter_plot
def plot(self, wait=True):
import pyqtgraph.opengl as gl
self.window.show()
if self.show_grids:
xgrid = gl.GLGridItem()
ygrid = gl.GLGridItem()
zgrid = gl.GLGridItem()
self.window.addItem(xgrid)
self.window.addItem(ygrid)
self.window.addItem(zgrid)
# Rotate x and y grids to face the correct direction
xgrid.rotate(90, 0, 1, 0)
ygrid.rotate(90, 1, 0, 0)
# Scale grids to the appropriate dimensions
xgrid.scale(12.8, 12.8, 12.8)
ygrid.scale(12.8, 12.8, 12.8)
zgrid.scale(12.8, 12.8, 12.8)
for file_name in self.files:
data_points = self._generate_data_points(file_name)
self._print("Generating plot points from %d data points" % len(data_points))
plot_points = self._generate_plot_points(data_points)
del data_points
self._print("Generating graph from %d plot points" % len(plot_points))
self.window.addItem(self._generate_plot(plot_points))
if wait:
self.wait()
def wait(self):
from pyqtgraph.Qt import QtCore, QtGui
t = QtCore.QTimer()
t.start(50)
QtGui.QApplication.instance().exec_()
class Plotter3D(Plotter):
'''
Plot data points within a 3D cube.
'''
DIMENSIONS = 3
def _generate_data_point(self, data):
return (ord(data[0]), ord(data[1]), ord(data[2]))
class Plotter2D(Plotter):
'''
Plot data points projected on each cube face.
'''
DIMENSIONS = 2
MAX_PLOT_POINTS = 12500
plane_count = -1
def _generate_data_point(self, data):
self.plane_count += 1
if self.plane_count > 5:
self.plane_count = 0
if self.plane_count == 0:
return (0, ord(data[0]), ord(data[1]))
elif self.plane_count == 1:
return (ord(data[0]), 0, ord(data[1]))
elif self.plane_count == 2:
return (ord(data[0]), ord(data[1]), 0)
elif self.plane_count == 3:
return (255, ord(data[0]), ord(data[1]))
elif self.plane_count == 4:
return (ord(data[0]), 255, ord(data[1]))
elif self.plane_count == 5:
return (ord(data[0]), ord(data[1]), 255)
if __name__ == '__main__':
import sys
try:
weight = int(sys.argv[2])
except:
weight = None
Plotter2D(sys.argv[1:], weight=weight, verbose=True).plot()
import os
from binwalk.config import *
from binwalk.compat import *
class Update:
'''
Class for updating binwalk configuration and signatures files from the subversion trunk.
Example usage:
from binwalk import Update
Update().update()
'''
BASE_URL = "https://raw.github.com/devttys0/binwalk/master/src/binwalk/"
MAGIC_PREFIX = "magic/"
CONFIG_PREFIX = "config/"
def __init__(self, verbose=False):
'''
Class constructor.
@verbose - Verbose flag.
Returns None.
'''
self.verbose = verbose
self.config = Config()
def update(self):
'''
Updates all system wide signatures and config files.
Returns None.
'''
self.update_binwalk()
self.update_bincast()
self.update_binarch()
self.update_extract()
self.update_zlib()
self.update_compressd()
def _do_update_from_svn(self, prefix, fname):
'''
Updates the specified file to the latest version of that file in SVN.
@prefix - The URL subdirectory where the file is located.
@fname - The name of the file to update.
Returns None.
'''
# Get the local http proxy, if any
# csoban.kesmarki
proxy_url = os.getenv('HTTP_PROXY')
if proxy_url:
proxy_support = urllib2.ProxyHandler({'http' : proxy_url})
opener = urllib2.build_opener(proxy_support)
urllib2.install_opener(opener)
url = self.BASE_URL + prefix + fname
try:
if self.verbose:
print("Fetching %s..." % url)
data = urllib2.urlopen(url).read()
open(self.config.paths['system'][fname], "wb").write(data)
except Exception as e:
raise Exception("Update._do_update_from_svn failed to update file '%s': %s" % (url, str(e)))
def update_binwalk(self):
'''
Updates the binwalk signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.BINWALK_MAGIC_FILE)
def update_bincast(self):
'''
Updates the bincast signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.BINCAST_MAGIC_FILE)
def update_binarch(self):
'''
Updates the binarch signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.BINARCH_MAGIC_FILE)
def update_zlib(self):
'''
Updates the zlib signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.ZLIB_MAGIC_FILE)
def update_compressd(self):
'''
Updates the compress'd signature file.
Returns None.
'''
self._do_update_from_svn(self.MAGIC_PREFIX, self.config.COMPRESSD_MAGIC_FILE)
def update_extract(self):
'''
Updates the extract.conf file.
Returns None.
'''
self._do_update_from_svn(self.CONFIG_PREFIX, self.config.EXTRACT_FILE)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment