Commit 43c8dd8c by devttys0

Updated modules/plugins to use ctypes wrapper classes.

parent 66408145
......@@ -6,35 +6,68 @@ import ctypes.util
from binwalk.core.compat import *
class Function(object):
'''
Container class for defining library functions.
'''
def __init__(self, **kwargs):
self.name = None
self.type = int
for (k, v) in iterator(kwargs):
setattr(self, k, v)
class FunctionHandler(object):
'''
Class for abstracting function calls via ctypes and handling Python 2/3 compatibility issues.
'''
PY2CTYPES = {
bytes : ctypes.c_char_p,
str : ctypes.c_char_p,
int : ctypes.c_int,
float : ctypes.c_float,
bool : ctypes.c_int,
None : ctypes.c_int,
}
RETVAL_CONVERTERS = {
None : int,
int : int,
float : float,
bool : bool,
str : bytes2str,
bytes : str2bytes,
}
def __init__(self, library, name, retype):
self.function = getattr(library, name)
self.retype = retype
def __init__(self, library, function):
'''
Class constructor.
@library - Library handle as returned by ctypes.cdll.LoadLibrary.
@function - An instance of the binwalk.core.C.Function class.
Returns None.
'''
self.retype = function.type
self.function = getattr(library, function.name)
if has_key(self.PY2CTYPES, self.retype):
self.function.restype = self.PY2CTYPES[self.retype]
self.retval_converter = self.RETVAL_CONVERTERS[self.retype]
else:
raise Exception("Unknown return type: '%s'" % retype)
raise Exception("Unknown return type: '%s'" % self.retype)
def run(self, *args):
'''
Executes the library function, handling Python 2/3 compatibility and properly converting the return type.
@*args - Library function arguments.
Returns the return value from the libraray function.
'''
args = list(args)
# Python3 expects a bytes object for char *'s, not a str.
# This allows us to pass either, regardless of the Python version.
for i in range(0, len(args)):
if isinstance(args[i], str):
args[i] = str2bytes(args[i])
......@@ -42,17 +75,35 @@ class Function(object):
return self.retval_converter(self.function(*args))
class Library(object):
'''
Class for loading the specified library via ctypes.
'''
def __init__(self, library, functions):
'''
Class constructor.
@library - Library name (e.g., 'magic' for libmagic).
@functions - A dictionary of function names and their return types (e.g., {'magic_buffer' : str})
Returns None.
'''
self.library = ctypes.cdll.LoadLibrary(self.find_library(library))
if not self.library:
raise Exception("Failed to load library '%s'" % library)
for (function, restype) in iterator(functions):
f = Function(self.library, function, restype)
setattr(self, function, f.run)
for function in functions:
f = FunctionHandler(self.library, function)
setattr(self, function.name, f.run)
def find_library(self, library):
'''
Locates the specified library.
@library - Library name (e.g., 'magic' for libmagic).
Returns a string to be passed to ctypes.cdll.LoadLibrary.
'''
lib_path = None
system_paths = {
'linux' : ['/usr/local/lib/lib%s.so' % library],
......
import ctypes
import ctypes.util
import binwalk.core.C
from binwalk.core.compat import *
class Magic(object):
'''
Minimalist Python wrapper around libmagic.
'''
LIBMAGIC_FUNCTIONS = {
"magic_open" : int,
"magic_load" : int,
"magic_buffer" : str,
}
LIBMAGIC_FUNCTIONS = [
binwalk.core.C.Function(name="magic_open", type=int),
binwalk.core.C.Function(name="magic_load", type=int),
binwalk.core.C.Function(name="magic_buffer", type=str),
]
MAGIC_NO_CHECK_TEXT = 0x020000
MAGIC_NO_CHECK_APPTYPE = 0x008000
......@@ -22,7 +23,7 @@ class Magic(object):
if magic_file:
self.magic_file = str2bytes(magic_file)
self.libmagic = binwalk.core.C.Library('magic', self.LIBMAGIC_FUNCTIONS)
self.libmagic = binwalk.core.C.Library("magic", self.LIBMAGIC_FUNCTIONS)
self.magic_cookie = self.libmagic.magic_open(self.MAGIC_FLAGS)
self.libmagic.magic_load(self.magic_cookie, self.magic_file)
......@@ -30,6 +31,3 @@ class Magic(object):
def buffer(self, data):
return self.libmagic.magic_buffer(self.magic_cookie, str2bytes(data), len(data))
if __name__ == "__main__":
magic = Magic()
print (magic.buffer("This is my voice on TV."))
#!/usr/bin/env python
import os
import ctypes
import ctypes.util
from binwalk.core.compat import str2bytes
import binwalk.core.C
from binwalk.core.module import Option, Kwarg, Module
class Deflate(object):
......@@ -17,37 +15,31 @@ class Deflate(object):
MIN_DECOMP_SIZE = 32*1024
DESCRIPTION = "Raw deflate compression stream"
TINFL_NAME = "tinfl"
TINFL_FUNCTIONS = [
binwalk.core.C.Function(name="is_deflated", type=int),
binwalk.core.C.Function(name="inflate_raw_file", type=None),
]
def __init__(self, module):
self.module = module
# The tinfl library is built and installed with binwalk
self.tinfl = ctypes.cdll.LoadLibrary(ctypes.util.find_library("tinfl"))
if not self.tinfl:
raise Exception("Failed to load the tinfl library")
self.tinfl = binwalk.core.C.Library(self.TINFL_NAME, self.TINFL_FUNCTIONS)
# Add an extraction rule
if self.module.extractor.enabled:
self.module.extractor.add_rule(regex='^%s' % self.DESCRIPTION.lower(), extension="deflate", cmd=self._extractor)
def pre_scan(self, fp):
if self.tinfl:
# Make sure we'll be getting enough data for a good decompression test
if fp.block_read_size < self.SIZE:
fp.set_block_size(peek=self.SIZE)
self._deflate_scan(fp)
return PLUGIN_TERMINATE
def _extractor(self, file_name):
if self.tinfl:
out_file = os.path.splitext(file_name)[0]
self.tinfl.inflate_raw_file(file_name, out_file)
out_file = os.path.splitext(file_name)[0]
self.tinfl.inflate_raw_file(file_name, out_file)
def decompress(self, data):
description = None
decomp_size = self.tinfl.is_deflated(str2bytes(data), len(data), 0)
decomp_size = self.tinfl.is_deflated(data, len(data), 0)
if decomp_size >= self.MIN_DECOMP_SIZE:
description = self.DESCRIPTION + ', uncompressed size >= %d' % decomp_size
......
import os
import re
import magic
import fnmatch
import ctypes
import ctypes.util
import fnmatch
import binwalk.core.C
import binwalk.core.common
from binwalk.core.compat import *
from binwalk.core.module import Module, Option, Kwarg
......@@ -69,14 +68,18 @@ class HashMatch(Module):
Kwarg(name='name', default=False),
Kwarg(name='max_results', default=None),
Kwarg(name='abspath', default=False),
Kwarg(name='matches', default={}),
Kwarg(name='types', default={}),
Kwarg(name='filter_by_name', default=False),
Kwarg(name='symlinks', default=False),
Kwarg(name='enabled', default=False),
]
# Requires libfuzzy.so
LIBRARY_NAME = "fuzzy"
LIBRARY_FUNCTIONS = [
binwalk.core.C.Function(name="fuzzy_hash_buf", type=int),
binwalk.core.C.Function(name="fuzzy_hash_filename", type=int),
binwalk.core.C.Function(name="fuzzy_compare", type=int),
]
# Max result is 148 (http://ssdeep.sourceforge.net/api/html/fuzzy_8h.html)
FUZZY_MAX_RESULT = 150
......@@ -93,14 +96,7 @@ class HashMatch(Module):
self.last_file1 = HashResult(None)
self.last_file2 = HashResult(None)
self.magic = magic.open(0)
self.magic.load()
self.lib = ctypes.cdll.LoadLibrary(ctypes.util.find_library(self.LIBRARY_NAME))
for k in get_keys(self.types):
for i in range(0, len(self.types[k])):
self.types[k][i] = re.compile(self.types[k][i])
self.lib = binwalk.core.C.Library(self.LIBRARY_NAME, self.LIBRARY_FUNCTIONS)
def _get_strings(self, fname):
return ''.join(list(binwalk.core.common.strings(fname, minimum=10)))
......@@ -159,23 +155,23 @@ class HashMatch(Module):
if file1_dup:
hash1 = self.last_file1.hash
else:
status |= self.lib.fuzzy_hash_buf(str2bytes(file1_strings), len(file1_strings), hash1)
status |= self.lib.fuzzy_hash_buf(file1_strings, len(file1_strings), hash1)
if file2_dup:
hash2 = self.last_file2.hash
else:
status |= self.lib.fuzzy_hash_buf(str2bytes(file2_strings), len(file2_strings), hash2)
status |= self.lib.fuzzy_hash_buf(file2_strings, len(file2_strings), hash2)
else:
if file1_dup:
hash1 = self.last_file1.hash
else:
status |= self.lib.fuzzy_hash_filename(str2bytes(file1), hash1)
status |= self.lib.fuzzy_hash_filename(file1, hash1)
if file2_dup:
hash2 = self.last_file2.hash
else:
status |= self.lib.fuzzy_hash_filename(str2bytes(file2), hash2)
status |= self.lib.fuzzy_hash_filename(file2, hash2)
if status == 0:
if not file1_dup:
......@@ -201,7 +197,7 @@ class HashMatch(Module):
def _get_file_list(self, directory):
'''
Generates a directory tree, including/excluding files as specified in self.matches and self.types.
Generates a directory tree.
@directory - The root directory to start from.
......@@ -219,38 +215,7 @@ class HashMatch(Module):
# Get a list of files, with or without symlinks as specified during __init__
files = [os.path.join(root, f) for f in files if self.symlinks or not os.path.islink(f)]
# If no filters were specified, return all files
if not self.types and not self.matches:
file_list += files
else:
# Filter based on the file type, as reported by libmagic
if self.types:
for f in files:
for (include, regex_list) in iterator(self.types):
for regex in regex_list:
try:
magic_result = self.magic.file(os.path.join(directory, f)).lower()
except Exception as e:
magic_result = ''
match = regex.match(magic_result)
# If this matched an include filter, or didn't match an exclude filter
if (match and include) or (not match and not include):
file_list.append(f)
# Filter based on file name
if self.matches:
for (include, file_filter_list) in iterator(self.matches):
for file_filter in file_filter_list:
matching_files = fnmatch.filter(files, file_filter)
# If this is an include filter, add all matching files to the list
if include:
file_list += matching_files
# Else, this add all files except those that matched to the list
else:
file_list += list(set(files) - set(matching_files))
file_list += files
return set(file_list)
......
import ctypes
import ctypes.util
import binwalk.core.C
from binwalk.core.common import *
class Plugin(object):
......@@ -9,23 +8,26 @@ class Plugin(object):
READ_SIZE = 64
COMPRESS42 = "compress42"
COMPRESS42_FUNCTIONS = [
binwalk.core.C.Function(name="is_compressed", type=bool),
]
def __init__(self, module):
self.fd = None
self.comp = None
if module.name == 'Signature':
self.comp = ctypes.cdll.LoadLibrary(ctypes.util.find_library("compress42"))
self.comp = binwalk.core.C.Library(self.COMPRESS42, self.COMPRESS42_FUNCTIONS)
def scan(self, result):
if self.comp:
if result.file and result.description.lower().startswith("compress'd data"):
fd = BlockFile(result.file.name, "r")
fd.seek(result.offset)
fd = BlockFile(result.file.name, "r", offset=result.offset, length=self.READ_SIZE)
compressed_data = fd.read(self.READ_SIZE)
fd.close()
if not self.comp.is_compressed(compressed_data, len(compressed_data)):
result.valid = False
fd.close()
import ctypes
import ctypes.util
from binwalk.core.compat import str2bytes
import binwalk.core.C
from binwalk.core.common import BlockFile
class Plugin(object):
......@@ -11,6 +9,11 @@ class Plugin(object):
MIN_DECOMP_SIZE = 16 * 1024
MAX_DATA_SIZE = 33 * 1024
TINFL = "tinfl"
TINFL_FUNCTIONS = [
binwalk.core.C.Function(name="is_deflated", type=int),
]
def __init__(self, module):
self.tinfl = None
self.module = module
......@@ -18,15 +21,14 @@ class Plugin(object):
# Only initialize this plugin if this is a signature scan
if module.name == 'Signature':
# Load libtinfl.so
self.tinfl = ctypes.cdll.LoadLibrary(ctypes.util.find_library('tinfl'))
self.tinfl = binwalk.core.C.Library(self.TINFL, self.TINFL_FUNCTIONS)
def scan(self, result):
# If this result is a zlib signature match, try to decompress the data
if self.tinfl and result.file and result.description.lower().startswith('zlib'):
# Seek to and read the suspected zlib data
fd = self.module.config.open_file(result.file.name, offset=result.offset, length=self.MAX_DATA_SIZE)
# Python3 ctypes needs a bytes object, not a str
data = str2bytes(fd.read(self.MAX_DATA_SIZE))
data = fd.read(self.MAX_DATA_SIZE)
fd.close()
# Check if this is valid zlib data
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment