Commit 778ee6f5 by devttys0

Integrated plugins with new API

No related merge requests found
......@@ -39,7 +39,7 @@ class Config:
# Sub directories
BINWALK_USER_DIR = ".binwalk"
BINWALK_MAGIC_DIR = "magic"
BINWALK_MAGIC_DIR = "magics"
BINWALK_CONFIG_DIR = "configs"
BINWALK_PLUGINS_DIR = "plugins"
......
# MIPS prologue
# addiu $sp, -XX
# 27 BD FF XX
0 string \377\275\47 MIPSEL instructions, function prologue{offset-adjust:-1}
0 string \47\275\377 MIPS instructions, function prologue
# MIPS epilogue
# jr $ra
0 belong 0x03e00008 MIPS instructions, function epilogue
0 lelong 0x03e00008 MIPSEL instructions, function epilogue
# PowerPC prologue
# mflr r0
0 belong 0x7C0802A6 PowerPC big endian instructions, function prologue
0 lelong 0x7C0802A6 PowerPC little endian instructions, funciton prologue
# PowerPC epilogue
# blr
0 belong 0x4E800020 PowerPC big endian instructions, function epilogue
0 lelong 0x4E800020 PowerPC little endian instructions, function epilogue
# ARM prologue
# STMFD SP!, {XX}
0 beshort 0xE92D ARMEB instructions, function prologue
0 leshort 0xE92D ARM instructions, function prologue{offset-adjust:-2}
# ARM epilogue
# LDMFD SP!, {XX}
0 beshort 0xE8BD ARMEB instructions, function epilogue
0 leshort 0xE8BD ARM instructions, function epilogue{offset-adjust:-2}
# Ubicom32 prologue
# move.4 -4($sp)++, $ra
0 belong 0x02FF6125 Ubicom32 instructions, function prologue
# Ubicom32 epilogues
# calli $ra, 0($ra)
# ret ($sp)4++
0 belong 0xF0A000A0 Ubicom32 instructions, function epilogue
0 belong 0x000022E1 Ubicom32 instructions, function epilogue
# AVR8 prologue
# push r28
# push r29
0 belong 0x93CF93DF AVR8 instructions, function prologue
0 belong 0x93DF93CF AVR8 instructions, function prologue
# AVR32 prologue
# pushm r7,lr
# mov r7,sp
0 string \xEB\xCD\x40\x80\x1A\x97 AVR32 instructions, function prologue
# SPARC eiplogue
# ret
# restore XX
0 string \x81\xC7\xE0\x08\x81\xE8 SPARC instructions, function epilogue
# x86 epilogue
# push ebp
# move ebp, esp
0 string \x55\x89\xE5 Intel x86 instructions, function epilogue
0 belong x Hex: 0x%.8X
#0 string x String: %s
#0 lequad x Little Endian Quad: %lld
#0 bequad x Big Endian Quad: %lld
0 lelong x Little Endian Long: %d
0 belong x Big Endian Long: %d
0 leshort x Little Endian Short: %d
0 beshort x Big Endian Short: %d
0 ledate x Little Endian Date: %s
0 bedate x Big Endian Date: %s
......@@ -476,15 +476,19 @@
>4 byte x \b%d
# New LZMA format signature
0 string \xFFLZMA\x00 LZMA compressed data (new),
>6 byte&0x10 0 single-block stream
>6 byte&0x10 0x10 multi-block stream
# See lzma file for LZMA signatures
0 string \xFFLZMA\x00 LZMA compressed data (new),
>6 byte&0x10 0 single-block stream
>6 byte&0x10 0x10 multi-block stream
0 string \xff\x06\x00\x00\x73\x4e\x61\x50\x70\x59 Snappy compression, stream identifier
0 string \x1f\x9d\x90 compress'd data, 16 bits
#0 beshort 0x7801 Zlib header, no compression
0 beshort 0x789c Zlib header, default compression
0 beshort 0x78da Zlib header, best compression
0 beshort 0x785e Zlib header, compressed
# Type: OpenSSL certificates/key files
# From: Nicolas Collignon <tsointsoin@gmail.com>
......
0 string \x1f\x9d\x90 compress'd data, 16 bits
#0 beshort 0x7801 Zlib header, no compression
0 beshort 0x789c Zlib header, default compression
0 beshort 0x78da Zlib header, best compression
0 beshort 0x785e Zlib header, compressed
......@@ -5,6 +5,7 @@ import inspect
import argparse
import binwalk.common
import binwalk.config
import binwalk.plugin
from binwalk.compat import *
class ModuleOption(object):
......@@ -72,6 +73,7 @@ class Result(object):
@file - The file object of the scanned file.
@valid - Set to True if the result if value, False if invalid.
@display - Set to True to display the result to the user, False to hide it.
@extract - Set to True to flag this result for extraction.
Provide additional kwargs as necessary.
Returns None.
......@@ -81,6 +83,7 @@ class Result(object):
self.file = None
self.valid = True
self.display = True
self.extract = True
for (k, v) in iterator(kwargs):
setattr(self, k, v)
......@@ -134,12 +137,11 @@ class Module(object):
RESULT = ['offset', 'description']
def __init__(self, dependency=False, **kwargs):
# TODO: Instantiate plugins object
# self.plugins = x
self.errors = []
self.results = []
self.status = None
self.name = self.__class__.__name__
self.plugins = binwalk.plugin.Plugins(self)
process_kwargs(self, kwargs)
......@@ -154,6 +156,8 @@ class Module(object):
raise e
except Exception as e:
self.error(exception=e)
self.plugins.load_plugins()
def load(self):
'''
......@@ -193,15 +197,13 @@ class Module(object):
return None
def _plugins_pre_scan(self):
# plugins(self)
return None
self.plugins.pre_scan_callbacks(self)
def _plugins_post_scan(self):
# plugins(self)
return None
self.plugins.post_scan_callbacks(self)
def _plugins_callback(self, r):
return None
def _plugins_result(self, r):
self.plugins.scan_callbacks(r)
def _build_display_args(self, r):
args = []
......@@ -230,7 +232,7 @@ class Module(object):
r = Result(**kwargs)
self.validate(r)
self._plugins_callback(r)
self._plugins_result(r)
if r.valid:
self.results.append(r)
......
......@@ -34,15 +34,6 @@ class Plugins:
This method is called after running a scan against a file, but before the file has been closed.
It is passed the file object of the scanned file.
Valid return values for all plugin callbacks are (PLUGIN_* values may be OR'd together):
PLUGIN_CONTINUE - Do nothing, continue the scan normally.
PLUGIN_NO_EXTRACT - Do not preform data extraction.
PLUGIN_NO_DISPLAY - Ignore the result(s); they will not be displayed or further processed.
PLUGIN_STOP_PLUGINS - Do not call any other plugins.
PLUGIN_TERMINATE - Terminate the scan.
None - The same as PLUGIN_CONTINUE.
Values returned by pre_scan affect all results during the scan of that particular file.
Values returned by callback affect only that specific scan result.
Values returned by post_scan are ignored since the scan of that file has already been completed.
......@@ -50,50 +41,20 @@ class Plugins:
By default, all plugins are loaded during binwalk signature scans. Plugins that wish to be disabled by
default may create a class variable named 'ENABLED' and set it to False. If ENABLED is set to False, the
plugin will only be loaded if it is explicitly named in the plugins whitelist.
Simple example plugin:
from binwalk.plugins import *
class Plugin:
# Set to False to have this plugin disabled by default.
ENABLED = True
def __init__(self):
self.binwalk = binwalk
print 'Scanning initialized!'
def __del__(self):
print 'Scanning complete!'
def pre_scan(self, fd):
print 'About to scan', fd.name
return PLUGIN_CONTINUE
def callback(self, results):
print 'Got a result:', results['description']
return PLUGIN_CONTINUE
def post_scan(self, fd):
print 'Done scanning', fd.name
return PLUGIN_CONTINUE
'''
RESULT = 'result'
SCAN = 'scan'
PRESCAN = 'pre_scan'
POSTSCAN = 'post_scan'
PLUGIN = 'Plugin'
MODULE_EXTENSION = '.py'
def __init__(self, whitelist=[], blacklist=[]):
self.config = binwalk.config.Config()
self.result = []
def __init__(self, parent=None):
self.scan = []
self.pre_scan = []
self.pre_parser = []
self.post_scan = []
self.whitelist = whitelist
self.blacklist = blacklist
self.parent = parent
self.config = binwalk.config.Config()
def __del__(self):
pass
......@@ -105,23 +66,14 @@ class Plugins:
pass
def _call_plugins(self, callback_list, arg):
retval = PLUGIN_CONTINUE
for callback in callback_list:
if (retval & PLUGIN_STOP_PLUGINS):
break
try:
val = callback(arg)
if val is not None:
retval |= val
callback(arg)
except KeyboardInterrupt as e:
raise e
except Exception as e:
sys.stderr.write("WARNING: %s.%s failed: %s\n" % (callback.__module__, callback.__name__, e))
return retval
def list_plugins(self):
'''
Obtain a list of all user and system plugin modules.
......@@ -165,32 +117,22 @@ class Plugins:
for file_name in os.listdir(plugins[key]['path']):
if file_name.endswith(self.MODULE_EXTENSION):
module = file_name[:-len(self.MODULE_EXTENSION)]
if module in self.blacklist:
continue
else:
plugin = imp.load_source(module, os.path.join(plugins[key]['path'], file_name))
plugin_class = getattr(plugin, self.PLUGIN)
try:
enabled = plugin_class.ENABLED
except KeyboardInterrupt as e:
raise e
except Exception as e:
enabled = True
plugins[key]['enabled'][module] = enabled
plugin = imp.load_source(module, os.path.join(plugins[key]['path'], file_name))
plugin_class = getattr(plugin, self.PLUGIN)
plugins[key]['modules'].append(module)
plugins[key]['enabled'][module] = True
plugins[key]['modules'].append(module)
try:
plugins[key]['descriptions'][module] = plugin_class.__doc__.strip().split('\n')[0]
except KeyboardInterrupt as e:
raise e
except Exception as e:
plugins[key]['descriptions'][module] = 'No description'
try:
plugins[key]['descriptions'][module] = plugin_class.__doc__.strip().split('\n')[0]
except KeyboardInterrupt as e:
raise e
except Exception as e:
plugins[key]['descriptions'][module] = 'No description'
return plugins
def _load_plugins(self):
def load_plugins(self):
plugins = self.list_plugins()
self._load_plugin_modules(plugins['user'])
self._load_plugin_modules(plugins['system'])
......@@ -203,19 +145,10 @@ class Plugins:
plugin = imp.load_source(module, file_path)
plugin_class = getattr(plugin, self.PLUGIN)
try:
# If this plugin is disabled by default and has not been explicitly white listed, ignore it
if plugin_class.ENABLED == False and module not in self.whitelist:
continue
except KeyboardInterrupt as e:
raise e
except Exception as e:
pass
class_instance = plugin_class()
class_instance = plugin_class(self.parent)
try:
self.result.append(getattr(class_instance, self.RESULT))
self.scan.append(getattr(class_instance, self.SCAN))
except KeyboardInterrupt as e:
raise e
except Exception as e:
......@@ -240,12 +173,12 @@ class Plugins:
except Exception as e:
sys.stderr.write("WARNING: Failed to load plugin module '%s': %s\n" % (module, str(e)))
def _pre_scan_callbacks(self, obj):
def pre_scan_callbacks(self, obj):
return self._call_plugins(self.pre_scan, obj)
def _post_scan_callbacks(self, obj):
def post_scan_callbacks(self, obj):
return self._call_plugins(self.post_scan, obj)
def _result_callbacks(self, obj):
return self._call_plugins(self.result, obj)
def scan_callbacks(self, obj):
return self._call_plugins(self.scan, obj)
from binwalk.common import *
from binwalk.plugins import *
class Plugin:
'''
Validates ARM instructions during opcode scans.
'''
BITMASK = 0x83FF
BITMASK_SIZE = 2
def __init__(self, binwalk):
self.fd = None
if binwalk.scan_type == binwalk.BINARCH:
self.enabled = True
else:
self.enabled = False
def pre_scan(self, fd):
if self.enabled:
self.fd = BlockFile(fd.name, 'r')
def callback(self, results):
if self.fd:
data = ''
try:
if results['description'].startswith('ARM instruction'):
self.fd.seek(results['offset'])
data = self.fd.read(self.BITMASK_SIZE)
data = data[1] + data[0]
elif results['description'].startswith('ARMEB instruction'):
self.fd.seek(results['offset']+self.BITMASK_SIZE)
data = self.fd.read(self.BITMASK_SIZE)
if data:
registers = int(data.encode('hex'), 16)
if (registers & self.BITMASK) != registers:
return PLUGIN_NO_DISPLAY
except:
pass
def post_scan(self, fd):
try:
self.fd.close()
except:
pass
import ctypes
import ctypes.util
from binwalk.common import *
from binwalk.plugins import *
class Plugin:
'''
Searches for and validates compress'd data.
'''
ENABLED = True
READ_SIZE = 64
def __init__(self, binwalk):
def __init__(self, module):
self.fd = None
self.comp = None
self.binwalk = binwalk
compressd_magic_file = binwalk.config.find_magic_file("compressd")
if binwalk.scan_type == binwalk.BINWALK:
if module.name == 'Signature':
self.comp = ctypes.cdll.LoadLibrary(ctypes.util.find_library("compress42"))
if self.comp:
binwalk.magic_files.append(compressd_magic_file)
elif compressd_magic_file in binwalk.magic_files:
binwalk.magic_files.pop(binwalk.magic_files.index(compressd_magic_file))
def __del__(self):
try:
self.fd.close()
except:
pass
def pre_scan(self, fd):
try:
if self.comp:
self.fd = BlockFile(fd.name, 'r')
except:
pass
def callback(self, results):
if self.fd and results['description'].lower().startswith("compress'd data"):
self.fd.seek(results['offset'])
compressed_data = self.fd.read(self.READ_SIZE)
def scan(self, result):
if self.comp:
if result.file and result.description.lower().startswith("compress'd data"):
fd = BlockFile(result.file.name, "r")
fd.seek(result.offset)
compressed_data = fd.read(self.READ_SIZE)
if not self.comp.is_compressed(compressed_data, len(compressed_data)):
return (PLUGIN_NO_DISPLAY | PLUGIN_NO_EXTRACT)
if not self.comp.is_compressed(compressed_data, len(compressed_data)):
result.valid = False
fd.close()
from binwalk.plugins import *
from binwalk.plugin import *
class Plugin:
'''
Ensures that ASCII CPIO archive entries only get extracted once.
'''
def __init__(self, binwalk):
self.binwalk = binwalk
def __init__(self, module):
self.found_archive = False
def pre_scan(self, fd):
self.enabled = (module.name == 'Signature')
def pre_scan(self, module):
# Be sure to re-set this at the beginning of every scan
self.found_archive = False
def callback(self, results):
if self.binwalk.extractor.enabled and self.binwalk.scan_type == self.binwalk.BINWALK:
def scan(self, result):
if self.enabled and result.valid:
# ASCII CPIO archives consist of multiple entries, ending with an entry named 'TRAILER!!!'.
# Displaying each entry is useful, as it shows what files are contained in the archive,
# but we only want to extract the archive when the first entry is found.
if results['description'].startswith('ASCII cpio archive'):
if result.description.startswith('ASCII cpio archive'):
if not self.found_archive:
# This is the first entry. Set found_archive and allow the scan to continue normally.
self.found_archive = True
return PLUGIN_CONTINUE
result.extract = True
elif 'TRAILER!!!' in results['description']:
# This is the last entry, un-set found_archive.
self.found_archive = False
# The first entry has already been found and this is the last entry, or the last entry
# has not yet been found. Don't extract.
return PLUGIN_NO_EXTRACT
# Allow all other results to continue normally.
return PLUGIN_CONTINUE
result.extract = False
#!/usr/bin/env python
import ctypes
import ctypes.util
from binwalk.plugins import *
from binwalk.common import BlockFile
class Plugin:
'''
Finds and extracts raw deflate compression streams.
'''
ENABLED = False
SIZE = 33*1024
# To prevent many false positives, only show data that decompressed to a reasonable size and didn't just result in a bunch of NULL bytes
MIN_DECOMP_SIZE = 32*1024
DESCRIPTION = "Deflate compressed data stream"
def __init__(self, binwalk):
self.binwalk = binwalk
# The tinfl library is built and installed with binwalk
self.tinfl = ctypes.cdll.LoadLibrary(ctypes.util.find_library("tinfl"))
if self.tinfl:
# Add an extraction rule
if self.binwalk.extractor.enabled:
self.binwalk.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="deflate", cmd=self._extractor)
def pre_scan(self, fp):
if self.tinfl:
# Make sure we'll be getting enough data for a good decompression test
if fp.MAX_TRAILING_SIZE < self.SIZE:
fp.MAX_TRAILING_SIZE = self.SIZE
self._deflate_scan(fp)
return PLUGIN_TERMINATE
def _extractor(self, file_name):
if self.tinfl:
out_file = os.path.splitext(file_name)[0]
self.tinfl.inflate_raw_file(file_name, out_file)
def _deflate_scan(self, fp):
# Set these so that the progress report reflects the current scan status
self.binwalk.scan_length = fp.length
self.binwalk.total_scanned = 0
while self.binwalk.total_scanned < self.binwalk.scan_length:
current_total = self.binwalk.total_scanned
(data, dlen) = fp.read_block()
if not data or dlen == 0:
break
# dlen == block size, but data includes MAX_TRAILING_SIZE data as well
actual_dlen = len(data)
for i in range(0, dlen):
decomp_size = self.tinfl.is_deflated(data[i:], actual_dlen-i, 0)
if decomp_size >= self.MIN_DECOMP_SIZE:
loc = fp.offset + current_total + i
description = self.DESCRIPTION + ', uncompressed size >= %d' % decomp_size
# Extract the file
if self.binwalk.extractor.enabled:
self.binwalk.extractor.extract(loc, description, fp.name, (fp.size - loc))
# Display results after extraction to be consistent with normal binwalk scans
self.binwalk.display.easy_results(loc, description)
# Update total_scanned here for immediate progress feedback
self.binwalk.total_scanned = current_total + i
if (current_total + i) > self.binwalk.scan_length:
break
# Set total_scanned here in case no data streams were identified
self.binwalk.total_scanned = current_total + dlen
......@@ -9,23 +9,21 @@ class Plugin:
Based on Bernardo Rodrigues' work: http://w00tsec.blogspot.com/2013/11/unpacking-firmware-images-from-cable.html
'''
ENABLED = True
FAKE_LZMA_SIZE = "\x00\x00\x00\x10\x00\x00\x00\x00"
SIGNATURE = "lzma compressed data"
def __init__(self, binwalk):
self.binwalk = binwalk
def __init__(self, module):
self.original_cmd = ''
self.enabled = (module.name == 'Signature')
if self.binwalk.extractor.enabled:
#if module.extractor.enabled:
# Replace the existing LZMA extraction command with our own
rules = self.binwalk.extractor.get_rules()
for i in range(0, len(rules)):
if rules[i]['regex'].match(self.SIGNATURE):
self.original_cmd = rules[i]['cmd']
rules[i]['cmd'] = self.lzma_cable_extractor
break
# rules = self.binwalk.extractor.get_rules()
# for i in range(0, len(rules)):
# if rules[i]['regex'].match(self.SIGNATURE):
# self.original_cmd = rules[i]['cmd']
# rules[i]['cmd'] = self.lzma_cable_extractor
# break
def lzma_cable_extractor(self, fname):
# Try extracting the LZMA file without modification first
......@@ -55,10 +53,10 @@ class Plugin:
shutil.move(out_name, fname)
self.binwalk.extractor.execute(self.original_cmd, fname)
def pre_parser(self, result):
def scan(self, result):
# The modified cable modem LZMA headers all have valid dictionary sizes and a properties byte of 0x5D.
if result['description'].lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result['description']:
if "properties: 0x5D" in result['description'] and "invalid dictionary size" not in result['description']:
result['invalid'] = False
result['description'] = result['description'].split("invalid uncompressed size")[0] + "missing uncompressed size"
if result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if "properties: 0x5D" in result.description and "invalid dictionary size" not in result.description:
result.valid = True
result.description = result.description.split("invalid uncompressed size")[0] + "missing uncompressed size"
from binwalk.compat import *
class Plugin:
'''
Modifies string analysis output to mimic that of the Unix strings utility.
'''
ENABLED = False
def __init__(self, binwalk):
self.modify_output = False
if binwalk.scan_type == binwalk.STRINGS:
binwalk.display.quiet = True
self.modify_output = True
def callback(self, results):
if self.modify_output:
try:
print(results['description'])
except Exception as e:
pass
import ctypes
import ctypes.util
from binwalk.plugins import *
from binwalk.common import BlockFile
class Plugin:
......@@ -11,44 +10,27 @@ class Plugin:
MIN_DECOMP_SIZE = 16*1024
MAX_DATA_SIZE = 33 * 1024
def __init__(self, binwalk):
self.fd = None
def __init__(self, module):
self.tinfl = None
zlib_magic_file = binwalk.config.find_magic_file('zlib')
# Only initialize this plugin if this is a normal binwalk signature scan
if binwalk.scan_type == binwalk.BINWALK:
# Only initialize this plugin if this is a signature scan
if module.name == 'Signature':
# Load libtinfl.so
self.tinfl = ctypes.cdll.LoadLibrary(ctypes.util.find_library('tinfl'))
if self.tinfl:
# Add the zlib file to the list of magic files
binwalk.magic_files.append(zlib_magic_file)
# Else, be sure to unload the zlib file from the list of magic signatures
elif zlib_magic_file in binwalk.magic_files:
binwalk.magic_files.pop(binwalk.magic_files.index(zlib_magic_file))
def pre_scan(self, fd):
if self.tinfl:
self.fd = BlockFile(fd.name, 'r')
def callback(self, result):
def scan(self, result):
# If this result is a zlib signature match, try to decompress the data
if self.fd and result['description'].lower().startswith('zlib'):
if self.tinfl and result.file and result.description.lower().startswith('zlib'):
# Seek to and read the suspected zlib data
self.fd.seek(result['offset'])
data = self.fd.read(self.MAX_DATA_SIZE)
fd = BlockFile(result.file.name, "r")
fd.seek(result.offset)
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
# Check if this is valid zlib data
decomp_size = self.tinfl.is_deflated(data, len(data), 1)
if decomp_size > 0:
result['description'] += ", uncompressed size >= %d" % decomp_size
result.description += ", uncompressed size >= %d" % decomp_size
else:
return (PLUGIN_NO_DISPLAY | PLUGIN_NO_EXTRACT)
return PLUGIN_CONTINUE
def post_scan(self, fd):
if self.fd:
self.fd.close()
result.valid = False
......@@ -160,12 +160,16 @@
>4 byte x \b%d
# New LZMA format signature
0 string \xFFLZMA\x00 LZMA compressed data (new),
>6 byte&0x10 0 single-block stream
>6 byte&0x10 0x10 multi-block stream
# See lzma file for LZMA signatures
0 string \xFFLZMA\x00 LZMA compressed data (new),
>6 byte&0x10 0 single-block stream
>6 byte&0x10 0x10 multi-block stream
0 string \xff\x06\x00\x00\x73\x4e\x61\x50\x70\x59 Snappy compression, stream identifier
0 string \x1f\x9d\x90 compress'd data, 16 bits
#0 beshort 0x7801 Zlib header, no compression
0 beshort 0x789c Zlib header, default compression
0 beshort 0x78da Zlib header, best compression
0 beshort 0x785e Zlib header, compressed
......@@ -97,7 +97,7 @@ os.chdir(working_directory)
print("generating binwalk magic file")
magic_files = listdir("magic")
magic_files.sort()
fd = open("binwalk/magic/binwalk", "wb")
fd = open("binwalk/magics/binwalk", "wb")
for magic in magic_files:
fpath = path.join("magic", magic)
if path.isfile(fpath):
......@@ -105,7 +105,7 @@ for magic in magic_files:
fd.close()
# The data files to install along with the binwalk module
install_data_files = ["magic/*", "config/*", "plugins/*", "modules/*"]
install_data_files = ["magics/*", "configs/*", "plugins/*", "modules/*"]
# Install the binwalk module, script and support files
setup( name = "binwalk",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment