Commit b16c8417 by devttys0

Replaced binwalk script with one using the new API.

parent e929bb00
#!/usr/bin/env python
import sys
import os.path
import binwalk
import binwalk.cmdopts
import binwalk.plotter
from binwalk.hashmatch import HashMatch
from binwalk.compat import *
import binwalk.modules
from threading import Thread
from getopt import GetoptError, gnu_getopt as GetOpt
def display_status():
global bwalk
while bwalk is not None:
def display_status(m):
# Display the current scan progress when the enter key is pressed.
while True:
try:
raw_input()
print("Progress: %.2f%% (%d / %d)\n" % (((float(bwalk.total_scanned) / float(bwalk.scan_length)) * 100), bwalk.total_scanned, bwalk.scan_length))
print("Progress: %.2f%% (%d / %d)\n" % (((float(m.status.completed) / float(m.status.total)) * 100), m.status.completed, m.status.total))
except KeyboardInterrupt as e:
raise e
except Exception:
pass
def examples():
name = os.path.basename(sys.argv[0])
print("""
Scanning firmware for file signatures:
\t$ %s firmware.bin
Extracting files from firmware:
\t$ %s -Me firmware.bin
Hueristic compression/encryption analysis:
\t$ %s -H firmware.bin
Scanning firmware for executable code:
\t$ %s -A firmware.bin
Performing a firmware strings analysis:
\t$ %s -S firmware.bin
Performing a firmware entropy analysis:
\t$ %s -E firmware.bin
Display identified file signatures on entropy graph:
\t$ %s -EB firmware.bin
Diffing multiple files:
\t$ %s -W firmware1.bin firmware2.bin firmware3.bin
Generating a 3D plot:
\t$ %s --3D firmware.bin
See http://binwalk.org/wiki/usage for more.
""" % (name, name, name, name, name, name, name, name, name))
sys.exit(0)
def main():
# The Binwalk class instance must be global so that the display_status thread can access it.
global bwalk
MIN_ARGC = 2
requested_scans = []
offset = 0
length = 0
strlen = 0
verbose = 0
matryoshka = 1
block_size = 0
failed_open_count = 0
max_points = None
max_extract_size = None
do_2d = False
quiet = False
do_comp = False
do_files = False
log_file = None
do_csv = False
do_rehash = False
save_plot = False
show_plot = True
show_grids = False
show_legend = True
entropy_scan = False
sig_scan_done = False
enable_plugins = True
exec_commands = True
show_invalid = False
entropy_algorithm = None
format_to_terminal = False
custom_signature = None
delay_extraction = False
ignore_time_skew = True
extract_rules_file = None
ignore_failed_open = False
extract_from_config = False
show_single_hex_dump = False
cleanup_after_extract = False
explicit_signature_scan = False
ignore_signature_keywords = False
magic_flags = binwalk.magic.MAGIC_NONE
markers = []
magic_files = []
file_opt_list = []
target_files = []
greps = []
excludes = []
searches = []
extracts = []
options = []
arguments = []
plugin_whitelist = []
plugin_blacklist = []
config = binwalk.Config()
# Require at least one argument (the target file)
if len(sys.argv) < MIN_ARGC:
binwalk.cmdopts.usage(sys.stderr)
try:
opts, args = GetOpt(sys.argv[1:], binwalk.cmdopts.short_options, binwalk.cmdopts.long_options)
except GetoptError as e:
sys.stderr.write("%s\n" % str(e))
binwalk.cmdopts.usage(sys.stderr)
for opt, arg in opts:
if opt in ("-h", "--help"):
binwalk.cmdopts.usage(sys.stdout)
elif opt in ("-?", "--examples"):
examples()
elif opt in ("-d", "--delay", "--honor-footers"):
delay_extraction = True
elif opt in ("-f", "--file"):
log_file = arg
elif opt in ("-c", "--csv"):
do_csv = True
elif opt in ("-q", "--quiet"):
quiet = True
elif opt in ("-s", "--strlen"):
strlen = binwalk.common.str2int(arg)
elif opt in ("-Q", "--no-legend"):
show_legend = False
elif opt in ("-J", "--save-plot"):
save_plot = True
elif opt in ("-N", "--no-plot"):
show_plot = False
elif opt in ("-V", "--show-grids"):
show_grids = True
elif opt in ("-3", "--3D", "--3d"):
requested_scans.append(binwalk.Binwalk.BINVIS)
do_2d = False
elif opt in ("-2", "--2D", "--2d"):
requested_scans.append(binwalk.Binwalk.BINVIS)
do_2d = True
elif opt in ("-E", "--entropy"):
requested_scans.append(binwalk.Binwalk.ENTROPY)
elif opt in ("-W", "--diff"):
requested_scans.append(binwalk.Binwalk.HEXDIFF)
elif opt in ("-P", "--rehash"):
do_rehash = True
elif opt in ("-w", "--terse"):
show_single_hex_dump = True
elif opt in ("-a", "--gzip"):
entropy_algorithm = 'gzip'
elif opt in("-t", "--term", "--tim"):
format_to_terminal = True
elif opt in("-p", "--disable-plugins"):
enable_plugins = False
elif opt in ("-b", "--dumb"):
ignore_signature_keywords = True
elif opt in ("-v", "--verbose"):
verbose += 1
elif opt in ("-S", "--strings"):
requested_scans.append(binwalk.Binwalk.STRINGS)
elif opt in ("-O", "--skip-unopened"):
ignore_failed_open = True
elif opt in ("-Z", "--max-points"):
max_points = binwalk.common.str2int(arg)
elif opt in ("-o", "--offset"):
offset = binwalk.common.str2int(arg)
elif opt in ("-l", "--length"):
length = binwalk.common.str2int(arg)
elif opt in ("-y", "--search", "--include"):
searches.append(arg)
elif opt in ("-x", "--exclude"):
excludes.append(arg)
elif opt in ("-D", "--dd"):
extracts.append(arg)
elif opt in ("-g", "--grep"):
greps.append(arg)
elif opt in ("-G", "--green"):
greps.append("32;")
elif opt in ("-i", "--red"):
greps.append("31;")
elif opt in ("-U", "--blue"):
greps.append("34;")
elif opt in ("-r", "--rm"):
cleanup_after_extract = True
elif opt in ("-m", "--magic"):
magic_files.append(arg)
elif opt in ("-k", "--keep-going"):
magic_flags |= binwalk.magic.MAGIC_CONTINUE
elif opt in ("-I", "--show-invalid"):
show_invalid = True
elif opt in ("-K", "--block"):
block_size = binwalk.common.str2int(arg)
elif opt in ("-X", "--disable-plugin"):
plugin_blacklist.append(arg)
elif opt in ("-Y", "--enable-plugin"):
plugin_whitelist.append(arg)
elif opt in ("-T", "--ignore-time-skew"):
ignore_time_skew = False
elif opt in ("-z", "--carve"):
exec_commands = False
elif opt in ("-j", "--max-size"):
max_extract_size = binwalk.common.str2int(arg)
elif opt in ("-H", "--heuristic", "--math"):
do_comp = True
if binwalk.Binwalk.ENTROPY not in requested_scans:
requested_scans.append(binwalk.Binwalk.ENTROPY)
elif opt in ("-F", "--marker"):
if ':' in arg:
(location, description) = arg.split(':', 1)
location = int(location)
markers.append((location, [{'description' : description, 'offset' : location}]))
elif opt in("-L", "--list-plugins"):
# List all user and system plugins, then exit
print('')
print('NAME TYPE ENABLED DESCRIPTION')
print('-' * 115)
with binwalk.Binwalk() as bw:
for (key, info) in iterator(binwalk.plugins.Plugins(bw).list_plugins()):
for module_name in info['modules']:
print('%-16s %-10s %-10s %s' % (module_name, key, info['enabled'][module_name], info['descriptions'][module_name]))
print ('')
sys.exit(1)
elif opt in ("-M", "--matryoshka"):
if arg:
matryoshka = binwalk.common.str2int(arg)
else:
# Original Zvyozdochkin matrhoska set had 8 dolls. This is a good number.
matryoshka = 8
elif opt in ("-e", "--extract"):
# If a file path was specified, use that as the extraction rules file
if arg:
extract_from_config = False
extract_rules_file = arg
# Else, use the default rules file
else:
extract_from_config = True
elif opt in ("-B", "--binwalk"):
requested_scans.append(binwalk.Binwalk.BINWALK)
# Load user file first so its signatures take precedence
magic_files.append(config.paths['user'][config.BINWALK_MAGIC_FILE])
magic_files.append(config.paths['system'][config.BINWALK_MAGIC_FILE])
elif opt in ("-A", "--opcodes"):
requested_scans.append(binwalk.Binwalk.BINARCH)
# Load user file first so its signatures take precedence
magic_files.append(config.paths['user'][config.BINARCH_MAGIC_FILE])
magic_files.append(config.paths['system'][config.BINARCH_MAGIC_FILE])
elif opt in ("-C", "--cast"):
requested_scans.append(binwalk.Binwalk.BINCAST)
# Don't stop at the first match (everything matches everything in this scan)
magic_flags |= binwalk.magic.MAGIC_CONTINUE
# Load user file first so its signatures take precedence
magic_files.append(config.paths['user'][config.BINCAST_MAGIC_FILE])
magic_files.append(config.paths['system'][config.BINCAST_MAGIC_FILE])
elif opt in ("-R", "--raw-bytes"):
custom_signature = arg
requested_scans.append(binwalk.Binwalk.CUSTOM)
explicit_signature_scan = True
elif opt in ("-u", "--update"):
try:
sys.stdout.write("Updating signatures...")
if verbose:
sys.stdout.write("\n")
sys.stdout.flush()
binwalk.Update(verbose).update()
sys.stdout.write("done.\n")
sys.exit(0)
except Exception as e:
if 'Permission denied' in str(e):
sys.stderr.write("failed (permission denied). Check your user permissions, or run the update as root.\n")
else:
sys.stderr.write('\n' + str(e) + '\n')
sys.exit(1)
# The --profile option is handled prior to calling main()
elif opt not in ('-P', '--profile'):
binwalk.cmdopts.usage(sys.stderr)
# Keep track of the options and arguments.
# This is used later to determine which argv entries are file names.
options.append(opt)
options.append("%s%s" % (opt, arg))
options.append("%s=%s" % (opt, arg))
arguments.append(arg)
# Treat any command line options not processed by getopt as target file paths.
for opt in sys.argv[1:]:
if opt not in arguments and opt not in options and not opt.startswith('-'):
file_opt_list.append(opt)
# Validate the target files listed in target_files
for tfile in file_opt_list:
# Ignore directories.
if not os.path.isdir(tfile):
# Make sure we can open the target files
try:
fd = open(tfile, "rb")
fd.close()
target_files.append(tfile)
except Exception as e:
sys.stdout.write("Cannot open file : %s\n" % str(e))
failed_open_count += 1
# Unless -O was specified, don't run the scan unless we are able to scan all specified files
if failed_open_count > 0 and not ignore_failed_open:
if failed_open_count > 1:
plural = 's'
else:
plural = ''
sys.stdout.write("Failed to open %d file%s for scanning, quitting...\n" % (failed_open_count, plural))
def usage(modules):
print modules.help()
sys.exit(1)
# If more than one target file was specified, enable verbose mode; else, there is
# nothing in the output to indicate which scan corresponds to which file.
if (matryoshka > 1 or len(target_files) > 1):
save_plot = True
if not verbose:
verbose = 1
elif len(target_files) == 0:
binwalk.cmdopts.usage(sys.stderr)
# Instantiate the Binwalk class
bwalk = binwalk.Binwalk(magic_files=magic_files,
flags=magic_flags,
verbose=verbose,
log=log_file,
quiet=quiet,
ignore_smart_keywords=ignore_signature_keywords,
load_plugins=enable_plugins,
ignore_time_skews=ignore_time_skew,
exec_commands=exec_commands,
max_extract_size=max_extract_size)
# If a custom signature was specified, create a temporary magic file containing the custom signature
# and ensure that it is the only magic file that will be loaded when Binwalk.scan() is called.
if custom_signature is not None:
bwalk.magic_files = [bwalk.parser.file_from_string(custom_signature)]
# Set any specified filters
bwalk.filter.exclude(excludes)
bwalk.filter.include(searches)
bwalk.filter.grep(filters=greps)
# Add any specified extract rules
bwalk.extractor.add_rule(extracts)
# If -e was specified, load the default extract rules
if extract_from_config:
bwalk.extractor.load_defaults()
# If --extract was specified, load the specified extraction rules file
if extract_rules_file is not None:
bwalk.extractor.load_from_file(extract_rules_file)
# Set the extractor cleanup value (True to clean up files, False to leave them on disk)
bwalk.extractor.cleanup_extracted_files(cleanup_after_extract)
# Enable delayed extraction, which will prevent supported file types from having trailing data when extracted
bwalk.extractor.enable_delayed_extract(delay_extraction)
# If --term was specified, enable output formatting to terminal
if format_to_terminal:
bwalk.display.enable_formatting(True)
# Enable log file CSV formatting, if specified and supported for all the requested scan types
if do_csv and binwalk.Binwalk.BINCAST not in requested_scans and binwalk.Binwalk.HEXDIFF not in requested_scans:
bwalk.display.enable_csv()
# If no scan was explicitly rquested, do a binwalk scan
if not requested_scans:
requested_scans.append(binwalk.Binwalk.BINWALK)
# If rehash was requested, add that here.
# We don't add it directly when parsing command line options, since that would require an explicit --binwalk scan request.
# Since rehash can only be run if a binwalk scan was already run, forcing the user to specify that is redundant.
if do_rehash:
requested_scans.append(binwalk.Binwalk.REHASH)
# Sort the scan types to ensure that scans are executed in the proper order (some scans rely on others being run first)
requested_scans.sort()
# Everything is set up, let's do a scan
try:
results = {}
def main():
modules = binwalk.module.Modules()
# Start the display_status function as a daemon thread.
t = Thread(target=display_status)
t = Thread(target=display_status, args=(modules,))
t.setDaemon(True)
t.start()
for scan_type in requested_scans:
if scan_type in [binwalk.Binwalk.BINWALK, binwalk.Binwalk.BINARCH, binwalk.Binwalk.BINCAST, binwalk.Binwalk.CUSTOM] and not sig_scan_done:
# There's no generic way for the binwalk class to know what
# scan type is being run, since all of these are signature scans,
# just with different magic files. Manually set the scan sub-type
# here to ensure that plugins can differentiate between the
# scans being performed.
bwalk.scan_type = scan_type
r = bwalk.scan(target_files,
offset=offset,
length=length,
show_invalid_results=show_invalid,
callback=bwalk.display.results,
start_callback=bwalk.display.header,
end_callback=bwalk.display.footer,
matryoshka=matryoshka,
plugins_whitelist=plugin_whitelist,
plugins_blacklist=plugin_blacklist)
bwalk.concatenate_results(results, r)
sig_scan_done = True
elif scan_type == binwalk.Binwalk.STRINGS:
r = bwalk.analyze_strings(target_files,
length=length,
offset=offset,
n=strlen,
block=block_size,
load_plugins=enable_plugins,
whitelist=plugin_whitelist,
blacklist=plugin_blacklist)
bwalk.concatenate_results(results, r)
elif scan_type == binwalk.Binwalk.COMPRESSION:
r = bwalk.analyze_compression(target_files, offset=offset, length=length)
bwalk.concatenate_results(results, r)
elif scan_type == binwalk.Binwalk.REHASH:
if len(sys.argv) == 1:
usage(modules)
elif not modules.execute():
modules.execute(*sys.argv[1:], signature=True)
diff_dirs = []
for target_file in target_files:
if has_key(results, target_file):
for (offset, offset_results) in results[target_file]:
for result in offset_results:
if has_key(result, "extract") and result["extract"]:
base_dir = result["extract"].split(os.path.sep)[0]
if base_dir and base_dir not in diff_dirs:
diff_dirs.append(base_dir)
if len(diff_dirs) > 1:
HashMatch(display=bwalk.display, cutoff=HashMatch.CONSERTATIVE_CUTOFF).directories(diff_dirs[0], diff_dirs[1:])
elif scan_type == binwalk.Binwalk.BINVIS:
# Always enable verbose mode; generating the plot can take some time for large files,
# and without verbose mode enabled it looks like binwalk is just sitting there doing nothing.
if do_2d:
bwalk.plot2d(target_files, offset=offset, length=length, max_points=max_points, show_grids=show_grids, verbose=True)
else:
bwalk.plot3d(target_files, offset=offset, length=length, max_points=max_points, show_grids=show_grids, verbose=True)
elif scan_type == binwalk.Binwalk.ENTROPY:
if not results:
for target_file in target_files:
results[target_file] = []
else:
bwalk.display.quiet = True
bwalk.display.cleanup()
for target_file in results.keys():
bwalk.concatenate_results(results, {target_file : markers})
bwalk.analyze_entropy(results,
offset,
length,
block_size,
show_plot,
show_legend,
save_plot,
algorithm=entropy_algorithm,
load_plugins=enable_plugins,
whitelist=plugin_whitelist,
blacklist=plugin_blacklist,
compcheck=do_comp)
elif scan_type == binwalk.Binwalk.HEXDIFF:
bwalk.hexdiff(target_files, offset=offset, length=length, block=block_size, first=show_single_hex_dump)
except KeyboardInterrupt:
pass
except IOError:
pass
# except Exception as e:
# print("Unexpected error: %s" % str(e))
bwalk.cleanup()
try:
if __name__ == '__main__':
try:
# Special options for profiling the code. For debug use only.
if '--profile' in sys.argv:
import cProfile
sys.argv.pop(sys.argv.index('--profile'))
cProfile.run('main()')
else:
main()
except KeyboardInterrupt:
pass
except KeyboardInterrupt:
print("")
......@@ -134,34 +134,38 @@ class MagicFilter:
return self.FILTER_INCLUDE
def invalid(self, data):
def valid_magic_result(self, data):
'''
Checks if the given string contains invalid data.
Called internally by Binwalk.scan().
@data - String to validate.
Returns True if data is invalid, False if valid.
Returns True if data is valid, False if invalid.
'''
# A result of 'data' is never ever valid.
if data == self.DATA_RESULT:
return True
return False
# If showing invalid results, just return False.
if self.show_invalid_results:
# Make sure this result wasn't filtered
if self.filter(data) == self.FILTER_EXCLUDE:
return False
# If showing invalid results, just return True without further checking.
if self.show_invalid_results:
return True
# Don't include quoted strings or keyword arguments in this search, as
# strings from the target file may legitimately contain the INVALID_RESULT text.
if self.INVALID_RESULT in common.strip_quoted_strings(self.smart._strip_tags(data)):
return True
return False
# There should be no non-printable characters in any of the data
if self.NON_PRINTABLE_RESULT in data:
return True
return False
return True
def grep(self, data=None, filters=[]):
'''
Add or check case-insensitive grep filters against the supplied data string.
......
......@@ -104,9 +104,6 @@ class Module(object):
'''
All module classes must be subclassed from this.
'''
# The module name, automatically populated.
NAME = ""
# The module title, as displayed in help output
TITLE = ""
......@@ -140,7 +137,8 @@ class Module(object):
# self.plugins = x
self.errors = []
self.results = []
self.NAME = self.__class__.__name__
self.status = None
self.name = self.__class__.__name__
process_kwargs(self, kwargs)
......@@ -218,14 +216,16 @@ class Module(object):
return args
def result(self, **kwargs):
def result(self, r=None, **kwargs):
'''
Validates a result, stores it in self.results and prints it.
Accepts the same kwargs as the binwalk.module.Result class.
@r - An existing instance of binwalk.module.Result.
Returns None.
'''
if r is None:
r = Result(**kwargs)
self.validate(r)
......@@ -233,6 +233,12 @@ class Module(object):
if r.valid:
self.results.append(r)
# Update the progress status automatically if it is not being done manually by the module
if r.file and not self.status.total:
self.status.total = r.file.length
self.status.completed = r.file.tell() - r.file.offset
if r.display:
display_args = self._build_display_args(r)
if display_args:
......@@ -266,12 +272,14 @@ class Module(object):
def footer(self):
self.config.display.footer()
def main(self):
def main(self, status):
'''
Responsible for calling self.init, initializing self.config.display, and calling self.run.
Returns the value returned from self.run.
'''
self.status = status
try:
self.init()
except KeyboardInterrupt as e:
......@@ -302,6 +310,16 @@ class Module(object):
return retval
class Status(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
self.clear()
def clear(self):
for (k,v) in iterator(self.kwargs):
setattr(self, k, v)
class Modules(object):
'''
Main class used for running and managing modules.
......@@ -316,21 +334,24 @@ class Modules(object):
Returns None.
'''
argv = list(argv)
self.arguments = []
self.loaded_modules = {}
self.status = Status(completed=0, total=0)
self._set_arguments(list(argv), kargv)
def _set_arguments(self, argv=[], kargv={}):
for (k,v) in iterator(kargv):
k = self._parse_api_opt(k)
if v not in [True, False, None]:
argv.append("%s %s" % (k, v))
else:
argv.append(k)
if not argv:
argv = sys.argv[1:]
if not argv and not self.arguments:
self.arguments = sys.argv[1:]
elif argv:
self.arguments = argv
self.loaded_modules = {}
def _parse_api_opt(self, opt):
# If the argument already starts with a hyphen, don't add hyphens in front of it
......@@ -384,18 +405,22 @@ class Modules(object):
return help_string + "\n"
def execute(self):
def execute(self, *args, **kwargs):
run_modules = []
self._set_arguments(list(args), kwargs)
for module in self.list():
if self.run(module):
run_modules.append(module)
obj = self.run(module)
if obj.enabled:
run_modules.append(obj)
return run_modules
def run(self, module):
obj = self.load(module)
if isinstance(obj, binwalk.module.Module) and obj.enabled:
obj.main()
obj.main(status=self.status)
self.status.clear()
# Add object to loaded_modules here, that way if a module has already been
# loaded directly and is subsequently also listed as a dependency we don't waste
......
......@@ -2,6 +2,7 @@ import os
import sys
import binwalk.common
import binwalk.module
import binwalk.config
import binwalk.display
from binwalk.config import *
from binwalk.compat import *
......@@ -87,6 +88,7 @@ class Configuration(binwalk.module.Module):
def load(self):
self.target_files = []
self.settings = binwalk.config.Config()
self.display = binwalk.display.Display(log=self.log_file,
csv=self.csv,
quiet=self.quiet,
......
import magic
import binwalk.config
import binwalk.module
import binwalk.parser
import binwalk.filter
......@@ -37,9 +36,6 @@ class Signature(binwalk.module.Module):
MAGIC_FLAGS = magic.MAGIC_NO_CHECK_TEXT | magic.MAGIC_NO_CHECK_ENCODING | magic.MAGIC_NO_CHECK_APPTYPE | magic.MAGIC_NO_CHECK_TOKENS
def init(self):
# Instantiate the config class so we can access file/directory paths
self.conf = binwalk.config.Config()
# Create SmartSignature and MagicParser class instances. These are mostly for internal use.
self.filter = binwalk.filter.MagicFilter()
self.smart = binwalk.smartsignature.SmartSignature(self.filter, ignore_smart_signatures=False)
......@@ -49,8 +45,8 @@ class Signature(binwalk.module.Module):
if not self.magic_files:
# Append the user's magic file first so that those signatures take precedence
self.magic_files = [
self.conf.paths['user'][self.conf.BINWALK_MAGIC_FILE],
self.conf.paths['system'][self.conf.BINWALK_MAGIC_FILE],
self.config.settings.paths['user'][self.config.settings.BINWALK_MAGIC_FILE],
self.config.settings.paths['system'][self.config.settings.BINWALK_MAGIC_FILE],
]
# Parse the magic file(s) and initialize libmagic
......@@ -61,15 +57,28 @@ class Signature(binwalk.module.Module):
# Once the temporary magic file is loaded into libmagic, we don't need it anymore; delete the temp file
self.parser.rm_magic_file()
def validate(self, r):
'''
Called automatically by self.result.
'''
if not r.description:
r.valid = False
if r.size and (r.size + r.offset) > r.file.size:
r.valid = False
if r.jump and (r.jump + r.offset) > r.file.size:
r.valid = False
def scan_file(self, fp):
while True:
current_block_offset = 0
(data, dlen) = fp.read_block()
if not data:
break
current_block_offset = 0
block_start = fp.tell() - dlen
self.status.completed = block_start - fp.offset
for candidate_offset in self.parser.find_signature_candidates(data, dlen):
if candidate_offset < current_block_offset:
......@@ -81,19 +90,27 @@ class Signature(binwalk.module.Module):
# Pass the data to libmagic, and split out multiple results into a list
magic_result = self.magic.buffer(candidate_data)
# TODO: Should filter process other validations? Reported size, for example?
if not self.filter.invalid(magic_result):
if self.filter.valid_magic_result(magic_result):
# The smart filter parser returns a dictionary of keyword values and the signature description.
smart = self.smart.parse(magic_result)
self.result(description=smart['description'], offset=block_start+candidate_offset)
r = self.smart.parse(magic_result)
r.offset = block_start + candidate_offset + r.adjust
r.file = fp
self.result(r=r)
if smart['jump'] > 0:
fp.seek(block_start + candidate_offset + smart['jump'])
current_block_offset = smart['jump']
if r.valid and r.jump > 0:
fp.seek(r.offset + r.jump)
current_block_offset = r.jump
def run(self):
for fp in self.config.target_files:
self.header()
self.status.clear()
self.status.total = fp.size
self.status.completed = 0
self.scan_file(fp)
self.footer()
......@@ -55,7 +55,9 @@ class MagicParser:
def __del__(self):
try:
self.cleanup()
except:
except KeyboardInterrupt as e:
raise e
except Exception:
pass
def rm_magic_file(self):
......@@ -66,7 +68,9 @@ class MagicParser:
'''
try:
self.fd.close()
except:
except KeyboardInterrupt as e:
raise e
except Exception:
pass
def cleanup(self):
......@@ -79,7 +83,9 @@ class MagicParser:
try:
self.raw_fd.close()
except:
except KeyboardInterrupt as e:
raise e
except Exception:
pass
def file_from_string(self, signature_string, offset=0, display_name=DEFAULT_DISPLAY_NAME):
......@@ -170,6 +176,8 @@ class MagicParser:
self.fd.write(str2bytes(line))
self.build_signature_set()
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception("Error parsing magic file '%s' on line %d: %s" % (file_name, line_count, str(e)))
......@@ -207,6 +215,8 @@ class MagicParser:
# The condition line may contain escaped sequences, so be sure to decode it properly.
entry['condition'] = string_decode(line_parts[2])
entry['description'] = ' '.join(line_parts[3:])
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception("%s :: %s", (str(e), line))
......@@ -214,6 +224,8 @@ class MagicParser:
# throw an exception, but let's catch it just in case...
try:
entry['offset'] = str2int(entry['offset'])
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception("%s :: %s", (str(e), line))
......@@ -234,6 +246,8 @@ class MagicParser:
# but needing that is rare.
try:
intval = str2int(entry['condition'].strip('L'))
except KeyboardInterrupt as e:
raise e
except Exception as e:
raise Exception("Failed to evaluate condition for '%s' type: '%s', condition: '%s', error: %s" % (entry['description'], entry['type'], entry['condition'], str(e)))
......@@ -322,6 +336,8 @@ class MagicParser:
'''
try:
return data.split(self.RESULT_SEPERATOR)
except:
except KeyboardInterrupt as e:
raise e
except Exception:
return []
import os
import sys
import imp
import binwalk.config
from binwalk.compat import *
# Valid return values for plugins
PLUGIN_CONTINUE = 0x00
PLUGIN_NO_EXTRACT = 0x01
PLUGIN_NO_DISPLAY = 0x02
PLUGIN_STOP_PLUGINS = 0x04
PLUGIN_TERMINATE = 0x08
class Plugins:
'''
Class to load and call plugin callback functions, handled automatically by Binwalk.scan / Binwalk.single_scan.
......@@ -66,7 +60,7 @@ class Plugins:
# Set to False to have this plugin disabled by default.
ENABLED = True
def __init__(self, binwalk):
def __init__(self):
self.binwalk = binwalk
print 'Scanning initialized!'
......@@ -86,16 +80,15 @@ class Plugins:
return PLUGIN_CONTINUE
'''
CALLBACK = 'callback'
RESULT = 'result'
PRESCAN = 'pre_scan'
POSTSCAN = 'post_scan'
PREPARSER = 'pre_parser'
PLUGIN = 'Plugin'
MODULE_EXTENSION = '.py'
def __init__(self, binwalk, whitelist=[], blacklist=[]):
self.binwalk = binwalk
self.callback = []
def __init__(self, whitelist=[], blacklist=[]):
self.config = binwalk.config.Config()
self.result = []
self.pre_scan = []
self.pre_parser = []
self.post_scan = []
......@@ -103,15 +96,12 @@ class Plugins:
self.blacklist = blacklist
def __del__(self):
self._cleanup()
pass
def __exit__(self, t, v, traceback):
self._cleanup()
def __enter__(self):
return self
def _cleanup(self):
try:
del self.binwalk
except:
def __exit__(self, t, v, traceback):
pass
def _call_plugins(self, callback_list, arg):
......@@ -168,7 +158,7 @@ class Plugins:
}
for key in plugins.keys():
plugins[key]['path'] = self.binwalk.config.paths[key][self.binwalk.config.PLUGINS]
plugins[key]['path'] = self.config.paths[key][self.config.PLUGINS]
for file_name in os.listdir(plugins[key]['path']):
if file_name.endswith(self.MODULE_EXTENSION):
......@@ -214,10 +204,10 @@ class Plugins:
except:
pass
class_instance = plugin_class(self.binwalk)
class_instance = plugin_class()
try:
self.callback.append(getattr(class_instance, self.CALLBACK))
self.result.append(getattr(class_instance, self.RESULT))
except:
pass
......@@ -227,11 +217,6 @@ class Plugins:
pass
try:
self.pre_parser.append(getattr(class_instance, self.PREPARSER))
except:
pass
try:
self.post_scan.append(getattr(class_instance, self.POSTSCAN))
except:
pass
......@@ -239,15 +224,12 @@ class Plugins:
except Exception as e:
sys.stderr.write("WARNING: Failed to load plugin module '%s': %s\n" % (module, str(e)))
def _pre_scan_callbacks(self, fd):
return self._call_plugins(self.pre_scan, fd)
def _post_scan_callbacks(self, fd):
return self._call_plugins(self.post_scan, fd)
def _pre_scan_callbacks(self, obj):
return self._call_plugins(self.pre_scan, obj)
def _scan_callbacks(self, results):
return self._call_plugins(self.callback, results)
def _post_scan_callbacks(self, obj):
return self._call_plugins(self.post_scan, obj)
def _scan_pre_parser_callbacks(self, results):
return self._call_plugins(self.pre_parser, results)
def _result_callbacks(self, obj):
return self._call_plugins(self.result, obj)
import re
import binwalk.module
from binwalk.compat import *
from binwalk.common import str2int, get_quoted_strings, MathExpression
......@@ -45,7 +46,7 @@ class SmartSignature:
Returns None.
'''
self.filter = filter
self.invalid = False
self.valid = True
self.last_one_of_many = None
self.ignore_smart_signatures = ignore_smart_signatures
......@@ -68,18 +69,20 @@ class SmartSignature:
'adjust' : 0, # The relative offset to add to the reported offset
'year' : 0, # The file's creation/modification year, if reported in the signature
'epoch' : 0, # The file's creation/modification epoch time, if reported in the signature
'invalid' : False, # Set to True if parsed numerical values appear invalid
'valid' : True, # Set to False if parsed numerical values appear invalid
}
self.invalid = False
self.valid = True
# If smart signatures are disabled, or the result data is not valid (i.e., potentially malicious),
# don't parse anything, just return the raw data as the description.
if self.ignore_smart_signatures or not self._is_valid(data):
results['description'] = data
else:
# Calculate and replace math keyword values
# Calculate and replace special keywords/values
data = self._replace_maths(data)
data = self._parse_raw_strings(data)
data = self._parse_string_len(data)
# Parse the offset-adjust value. This is used to adjust the reported offset at which
# a signature was located due to the fact that MagicParser.match expects all signatures
......@@ -91,17 +94,23 @@ class SmartSignature:
# extracted (see Binwalk.scan).
try:
results['size'] = str2int(self._get_math_arg(data, 'filesize'))
except:
except KeyboardInterrupt as e:
raise e
except Exception:
pass
try:
results['year'] = str2int(self._get_keyword_arg(data, 'year'))
except:
except KeyboardInterrupt as e:
raise e
except Exception:
pass
try:
results['epoch'] = str2int(self._get_keyword_arg(data, 'epoch'))
except:
except KeyboardInterrupt as e:
raise e
except Exception:
pass
results['delay'] = self._get_keyword_arg(data, 'delay')
......@@ -116,9 +125,9 @@ class SmartSignature:
results['name'] = self._get_keyword_arg(data, 'filename').strip('"')
results['description'] = self._strip_tags(data)
results['invalid'] = self.invalid
results['valid'] = self.valid
return results
return binwalk.module.Result(**results)
def _is_valid(self, data):
'''
......@@ -152,7 +161,7 @@ class SmartSignature:
Returns True if the string result is one of many.
Returns False if the string result is not one of many.
'''
if not self.filter.invalid(data):
if self.filter.valid_magic_result(data):
if self.last_one_of_many is not None and data.startswith(self.last_one_of_many):
return True
......@@ -197,7 +206,7 @@ class SmartSignature:
value = MathExpression(arg).value
if value is None:
value = 0
self.invalid = True
self.valid = False
return value
......@@ -215,7 +224,9 @@ class SmartSignature:
if offset_str:
try:
offset = str2int(offset_str)
except:
except KeyboardInterrupt as e:
raise e
except Exception:
pass
return offset
......@@ -281,7 +292,9 @@ class SmartSignature:
# Convert the string to an integer as a sanity check
try:
string_length = '%d' % len(raw_string)
except:
except KeyboardInterrupt as e:
raise e
except Exception:
string_length = '0'
# Strip out *everything* after the string-len keyword, including the keyword itself.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment