Commit 23131dad by devttys0

All plugin classes now sub-classed from binwalk.core.plugin.Plugin.

parent b3cff800
......@@ -4,7 +4,56 @@ import imp
import binwalk.core.settings
from binwalk.core.compat import *
class Plugins:
class Plugin(object):
'''
Class from which all plugin classes are based.
'''
# A list of case-sensitive module names for which this plugin should be loaded.
# If no module names are specified, the plugin will be loaded for all modules.
MODULES = []
def __init__(self, module):
'''
Class constructor.
@module - A handle to the current module that this plugin is loaded for.
Returns None.
'''
self.module = module
if not self.MODULES or self.module.name in self.MODULES:
self._enabled = True
self.init()
else:
self._enabled = False
def init(self):
'''
Child class should override this if needed.
Invoked during plugin initialization.
'''
pass
def pre_scan(self):
'''
Child class should override this if needed.
'''
pass
def scan(self, result):
'''
Child class should override this if needed.
'''
pass
def post_scan(self):
'''
Child class should override this if needed.
'''
pass
class Plugins(object):
'''
Class to load and call plugin callback functions, handled automatically by Binwalk.scan / Binwalk.single_scan.
An instance of this class is available during a scan via the Binwalk.plugins object.
......@@ -117,12 +166,18 @@ class Plugins:
for file_name in os.listdir(plugins[key]['path']):
if file_name.endswith(self.MODULE_EXTENSION):
module = file_name[:-len(self.MODULE_EXTENSION)]
plugin = imp.load_source(module, os.path.join(plugins[key]['path'], file_name))
plugin_class = getattr(plugin, self.PLUGIN)
try:
plugin = imp.load_source(module, os.path.join(plugins[key]['path'], file_name))
plugin_class = getattr(plugin, self.PLUGIN)
plugins[key]['enabled'][module] = True
plugins[key]['modules'].append(module)
plugins[key]['enabled'][module] = True
plugins[key]['modules'].append(module)
except KeyboardInterrupt as e:
raise e
except Exception as e:
sys.stderr.write("WARNING: Error loading plugin '%s': %s\n" % (file_name, str(e)))
plugins[key]['enabled'][module] = False
try:
plugins[key]['descriptions'][module] = plugin_class.__doc__.strip().split('\n')[0]
......@@ -146,6 +201,8 @@ class Plugins:
plugin_class = getattr(plugin, self.PLUGIN)
class_instance = plugin_class(self.parent)
if not class_instance._enabled:
continue
try:
self.scan.append(getattr(class_instance, self.SCAN))
......
import binwalk.core.C
from binwalk.core.common import *
import binwalk.core.plugin
class Plugin(object):
class Plugin(binwalk.core.plugin.Plugin):
'''
Searches for and validates compress'd data.
'''
MODULES = ['Signature']
READ_SIZE = 64
COMPRESS42 = "compress42"
......@@ -13,21 +16,18 @@ class Plugin(object):
binwalk.core.C.Function(name="is_compressed", type=bool),
]
def __init__(self, module):
self.fd = None
self.comp = None
comp = None
if module.name == 'Signature':
self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
def init(self):
self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
def scan(self, result):
if self.comp:
if result.file and result.description.lower().startswith("compress'd data"):
fd = BlockFile(result.file.name, "r", offset=result.offset, length=self.READ_SIZE)
compressed_data = fd.read(self.READ_SIZE)
fd.close()
if result.file and result.description.lower().startswith("compress'd data"):
fd = BlockFile(result.file.name, "r", offset=result.offset, length=self.READ_SIZE)
compressed_data = fd.read(self.READ_SIZE)
fd.close()
if not self.comp.is_compressed(compressed_data, len(compressed_data)):
result.valid = False
if not self.comp.is_compressed(compressed_data, len(compressed_data)):
result.valid = False
class Plugin(object):
import binwalk.core.plugin
class Plugin(binwalk.core.plugin.Plugin):
'''
Ensures that ASCII CPIO archive entries only get extracted once.
'''
def __init__(self, module):
self.found_archive = False
self.enabled = (module.name == 'Signature')
def pre_scan(self, module):
MODULES = ['Signature']
def pre_scan(self):
# Be sure to re-set this at the beginning of every scan
self.found_archive = False
def scan(self, result):
if self.enabled and result.valid:
if result.valid:
# ASCII CPIO archives consist of multiple entries, ending with an entry named 'TRAILER!!!'.
# Displaying each entry is useful, as it shows what files are contained in the archive,
# but we only want to extract the archive when the first entry is found.
......
import os
import shutil
import binwalk.core.plugin
from binwalk.core.compat import *
from binwalk.core.common import BlockFile
class Plugin(object):
class Plugin(binwalk.core.plugin.Plugin):
'''
Finds and extracts modified LZMA files commonly found in cable modems.
Based on Bernardo Rodrigues' work: http://w00tsec.blogspot.com/2013/11/unpacking-firmware-images-from-cable.html
'''
MODULES = ['Signature']
FAKE_LZMA_SIZE = "\x00\x00\x00\x10\x00\x00\x00\x00"
SIGNATURE = "lzma compressed data"
def __init__(self, module):
def init(self):
self.original_cmd = ''
self.enabled = (module.name == 'Signature')
self.module = module
if self.enabled:
# Replace the existing LZMA extraction command with our own
rules = self.module.extractor.get_rules()
for i in range(0, len(rules)):
if rules[i]['regex'].match(self.SIGNATURE):
self.original_cmd = rules[i]['cmd']
rules[i]['cmd'] = self.lzma_cable_extractor
break
# Replace the existing LZMA extraction command with our own
rules = self.module.extractor.get_rules()
for i in range(0, len(rules)):
if rules[i]['regex'].match(self.SIGNATURE):
self.original_cmd = rules[i]['cmd']
rules[i]['cmd'] = self.lzma_cable_extractor
break
def lzma_cable_extractor(self, fname):
# Try extracting the LZMA file without modification first
......@@ -57,7 +56,7 @@ class Plugin(object):
def scan(self, result):
# The modified cable modem LZMA headers all have valid dictionary sizes and a properties byte of 0x5D.
if self.enabled and result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if result.description.lower().startswith(self.SIGNATURE) and "invalid uncompressed size" in result.description:
if "properties: 0x5D" in result.description and "invalid dictionary size" not in result.description:
result.valid = True
result.description = result.description.split("invalid uncompressed size")[0] + "missing uncompressed size"
......
import binwalk.core.C
import binwalk.core.plugin
from binwalk.core.common import BlockFile
class Plugin(object):
class Plugin(binwalk.core.plugin.Plugin):
'''
Searches for and validates zlib compressed data.
'''
MODULES = ['Signature']
MIN_DECOMP_SIZE = 16 * 1024
MAX_DATA_SIZE = 33 * 1024
......@@ -14,18 +16,13 @@ class Plugin(object):
binwalk.core.C.Function(name="is_deflated", type=int),
]
def __init__(self, module):
self.tinfl = None
self.module = module
# Only initialize this plugin if this is a signature scan
if module.name == 'Signature':
# Load libtinfl.so
self.tinfl = binwalk.core.C.Library(self.TINFL, self.TINFL_FUNCTIONS)
def init(self):
# Load libtinfl.so
self.tinfl = binwalk.core.C.Library(self.TINFL, self.TINFL_FUNCTIONS)
def scan(self, result):
# If this result is a zlib signature match, try to decompress the data
if self.tinfl and result.file and result.description.lower().startswith('zlib'):
if result.file and result.description.lower().startswith('zlib'):
# Seek to and read the suspected zlib data
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
data = fd.read(self.MAX_DATA_SIZE)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment