Commit 3a8c31a5 by Fabian Rump Committed by Timm Behner

Initial commit

parents
# general things to ignore
build/
dist/
*.egg-info/
*.egg
*.py[cod]
__pycache__/
*.so
*~
from .analysis_plugin import AnalysisPlugin, AnalysisPluginDomain, AnalysisPluginFile, AnalysisPluginIP, AnalysisPluginURL, DomainAnalysisMixin, FileAnalysisMixin, IPAnalysisMixin, URLAnalysisMixin
from .analysis_plugin import AnalysisPluginString, StringAnalysisMixin
__all__ = [
'AnalysisPlugin',
'AnalysisPluginDomain',
'AnalysisPluginFile',
'AnalysisPluginIP',
'AnalysisPluginURL',
'DomainAnalysisMixin',
'FileAnalysisMixin',
'IPAnalysisMixin',
'URLAnalysisMixin'
'AnalysisPluginString',
'StringAnalysisMixin',
]
import datetime
class AnalysisPlugin:
"""
All analysis plugins which are implemented according to our common code guidelines should be derived from this class.
This way we can make sure that the implemented plugins follow the defined criteria and can be used in different applications.
Note that this class does neither define nor implement any specific analysis method. Instead it provides a common interface to find out which
analysis methods are supported by the derived analysis plugin class. Specifically, the user of any derived class may call the ``can_analyze_xyz`` methods
to find out which types of analysis can be executed by the analysis plugin.
:ivar plugin_version: The version number of the analysis plugin
:ivar system_version: Optional version number of an external system which is used by the plugin, i.e. the version of an AV signature database.
"""
def __init__(self, plugin_version, system_version=None):
"""
Initialize the analysis plugin.
The __init__ method of the superclass will set the ``plugin_version`` and optionally the ``system_version`` variables.
:param plugin_version: plugin_version: The version number of the analysis plugin
:param system_version: Optional version number of an external system which is used by the plugin, i.e. the version of an AV signature database.
"""
self.plugin_version = plugin_version
self.system_version = system_version
def can_analyze_file(self):
"""
Returns true if the analysis plugin supports the analysis of files.
:return: True if file analysis is supported, False otherwise
"""
return hasattr(self, "analyze_file") and callable(getattr(self, "analyze_file"))
def can_analyze_url(self):
"""
Returns true if the analysis plugin supports the analysis of URLs.
:return: True if URL analysis is supported, False otherwise
"""
return hasattr(self, "analyze_url") and callable(getattr(self, "analyze_url"))
def can_analyze_ip(self):
"""
Returns true if the analysis plugin supports the analysis of IP addresses.
:return: True if IP address analysis is supported, False otherwise
"""
return hasattr(self, "analyze_ip") and callable(getattr(self, "analyze_ip"))
def can_analyze_domain(self):
"""
Returns true if the analysis plugin supports the analysis of domain names.
:return: True if domain name analysis is supported, False otherwise
"""
return hasattr(self, "analyze_domain") and callable(getattr(self, "analyze_domain"))
def prepare_analysis_report_dictionary(self):
"""
Create and return a dictionary that can be used to return the analysis report.
:return: Dictionary with analysis metadata
"""
result = {
'plugin_version': self.plugin_version,
'analysis_date': datetime.datetime.now()
}
if self.system_version is not None:
result['system_version'] = self.system_version
return result
class FileAnalysisMixin:
"""
Simple mixin which should be used by analysis plugins when implementing file analysis.
"""
def analyze_file(self, file_path):
"""
Placeholder function for file analysis. This function should be overwritten by the analysis plugin to execute the
plugin-specific analysis procedure and return an analysis report. Will raise a ``NotImplementedError`` if used when not overwritten.
:param file_path: Path to the file which should be analyzed. Can be an absolute path or relative to the current working directory.
:type file_path: str
:return: Dictionary with analysis report
"""
raise NotImplementedError('This method is not implemented.')
class URLAnalysisMixin:
"""
Simple mixin which should be used by analysis plugins when implementing URL analysis.
"""
def analyze_url(self, url):
"""
Placeholder function for URL analysis. This function should be overwritten by the analysis plugin to execute the
plugin-specific analysis procedure and return an analysis report. Will raise a ``NotImplementedError`` if used when not overwritten.
:param url: URL that should be analyzed
:type url: str
:return: Dictionary with analysis report
"""
raise NotImplementedError('This method is not implemented.')
class IPAnalysisMixin:
"""
Simple mixin which should be used by analysis plugins when implementing IP address analysis.
"""
def analyze_ip(self, ip):
"""
Placeholder function for IP address analysis. This function should be overwritten by the analysis plugin to execute the
plugin-specific analysis procedure and return an analysis report. Will raise a ``NotImplementedError`` if used when not overwritten.
:param ip: IPv4 address or IPv6 address that should be analyzed
:type ip: IPv4Address or IPv6Address
:return: Dictionary with analysis report
"""
raise NotImplementedError('This method is not implemented.')
class StringAnalysisMixin:
"""
Simple mixin which should be used by analysis plugins when implementing string analysis.
"""
def analyze_string(self, string):
"""
Placeholder function for string analysis. This function should be overwritten by the analysis plugin to execute the
plugin-specific analysis procedure and return an analysis report. Will raise a ``NotImplementedError`` if used when not overwritten.
:param string: string that should be analyzed
:type string: string
:return: Dictionary with analysis report
"""
raise NotImplementedError('This method is not implemented.')
class DomainAnalysisMixin:
"""
Simple mixin which should be used by analysis plugins when implementing domain name analysis.
"""
def analyze_domain(self, domain):
"""
Placeholder function for domain name analysis. This function should be overwritten by the analysis plugin to execute the
plugin-specific analysis procedure and return an analysis report. Will raise a ``NotImplementedError`` if used when not overwritten.
:param domain: Domain name which should be analyzed.
:type domain: str
:return: Dictionary with analysis report
"""
raise NotImplementedError('This method is not implemented.')
class AnalysisPluginFile(AnalysisPlugin, FileAnalysisMixin):
"""
Convenience class that can be used as the superclass for file analysis plugins. Derived from :py:class:`.AnalysisPlugin` and :py:class:`.FileAnalysisMixin`
"""
pass
class AnalysisPluginURL(AnalysisPlugin, URLAnalysisMixin):
"""
Convenience class that can be used as the superclass for URL analysis plugins. Derived from :py:class:`.AnalysisPlugin` and :py:class:`.URLAnalysisMixin`
"""
pass
class AnalysisPluginIP(AnalysisPlugin, IPAnalysisMixin):
"""
Convenience class that can be used as the superclass for IP address analysis plugins. Derived from :py:class:`.AnalysisPlugin` and :py:class:`.IPAnalysisMixin`
"""
pass
class AnalysisPluginDomain(AnalysisPlugin, DomainAnalysisMixin):
"""
Convenience class that can be used as the superclass for domain name analysis plugins. Derived from :py:class:`.AnalysisPlugin` and :py:class:`.DomainAnalysisMixin`
"""
pass
class AnalysisPluginString(AnalysisPlugin, StringAnalysisMixin):
"""
Convenience class that can be used as the superclass for string analysis plugins. Derived from :py:class:`.AnalysisPlugin` and :py:class:`.StringAnalysisMixin`
"""
pass
import argparse
import importlib
import importlib.machinery
import logging
import pprint
PROGRAM_NAME = 'Common Analysis CLI test client'
PROGRAM_VERSION = '1.0'
PROGRAM_DESCRIPTION = 'Simple test client to run common analysis modules from the command line'
def _setup_argparser():
parser = argparse.ArgumentParser(description='{} - {}'.format(PROGRAM_NAME, PROGRAM_DESCRIPTION))
parser.add_argument('-V', '--version', action='version', version='{} {}'.format(PROGRAM_NAME, PROGRAM_VERSION))
parser.add_argument('-l', '--log_file',
help='path to log file')
parser.add_argument('-L', '--log_level',
help='define the log level [DEBUG,INFO,WARNING,ERROR]',
default='WARNING')
parser.add_argument('-a', '--analysis-module',
help='name of the analysis module which should be loaded')
parser.add_argument('-f', '--file',
help='path of the file that should be analyzed by the analysis module')
parser.add_argument('-u', '--url',
help='URL that should be analyzed by the analysis module')
parser.add_argument('-i', '--ip',
help='IPv4/IPv6 that should be analyzed by the analysis module')
parser.add_argument('-d', '--domain',
help='Domain that should be analyzed by the analysis module')
return parser.parse_args()
def _setup_logging(args):
log_level = getattr(logging, args.log_level.upper(), None)
log_file = getattr(args, 'log_file', None)
log_format = logging.Formatter(fmt='[%(asctime)s][%(module)s][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger('')
logger.setLevel(logging.DEBUG)
console_log = logging.StreamHandler()
console_log.setLevel(logging.INFO)
console_log.setFormatter(log_format)
logger.addHandler(console_log)
if log_file:
file_log = logging.FileHandler(args.log_file)
file_log.setLevel(log_level)
file_log.setFormatter(log_format)
logger.addHandler(file_log)
def _load_analysis_class_from_module(args):
logger = logging.getLogger('')
analysis_module_name = getattr(args, 'analysis_module', None)
if analysis_module_name is None:
logger.error('No analysis module specified. Terminating.')
exit(1)
try:
logger.info('Initializing analysis module: %s', analysis_module_name)
module = importlib.import_module(analysis_module_name)
return getattr(module, 'analysis_class')
except ImportError as e:
logger.error('Module %s can not be imported. Terminating.', analysis_module_name)
raise e
except AttributeError as e:
logger.error('No analysis class found in module %s. Terminating.', analysis_module_name)
raise e
def _analyze_file(analyzer, file_path):
logger = logging.getLogger('')
if analyzer.can_analyze_file():
logger.info('Analyzing file: %s', file_path)
return analyzer.analyze_file(file_path)
else:
logger.warn('File analysis requested, but analysis module can not analyze files. Skipping.')
def _analyze_url(analyzer, url):
logger = logging.getLogger('')
if analyzer.can_analyze_url():
logger.info('Analyzing URL: %s', url)
return analyzer.analyze_url(url)
else:
logger.warn('URL analysis requested, but analysis module can not analyze URLs. Skipping.')
def _analyze_ip(analyzer, ip):
logger = logging.getLogger('')
if analyzer.can_analyze_ip():
logger.info('Analyzing IP: %s', ip)
return analyzer.analyze_ip(ip)
else:
logger.warn('IP analysis requested, but analysis module can not analyze IPs. Skipping.')
def _analyze_domain(analyzer, domain):
logger = logging.getLogger('')
if analyzer.can_analyze_domain():
logger.info('Analyzing domain: %s', domain)
return analyzer.analyze_domain(domain)
else:
logger.warn('Domain analysis requested, but analysis module can not analyze domains. Skipping.')
def _pretty_print(object):
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(object)
def _analyze(args, analysis_class):
analyzer = analysis_class()
file_path = getattr(args, 'file', None)
url = getattr(args, 'url', None)
ip = getattr(args, 'ip', None)
domain = getattr(args, 'domain', None)
if file_path:
result = _analyze_file(analyzer, file_path)
_pretty_print(result)
if url:
result = _analyze_url(analyzer, url)
_pretty_print(result)
if ip:
result = _analyze_ip(analyzer, ip)
_pretty_print(result)
if domain:
result = _analyze_domain(analyzer, domain)
_pretty_print(result)
def _main():
args = _setup_argparser()
_setup_logging(args)
analysis_class = _load_analysis_class_from_module(args)
_analyze(args, analysis_class)
exit(0)
if __name__ == '__main__':
_main()
import os
import subprocess
from setuptools import setup, find_packages
setup(
name="common_analysis_base",
version=subprocess.check_output(['git', 'describe', '--always'], cwd=os.path.dirname(os.path.abspath(__file__))).strip().decode('utf-8'),
packages=find_packages(),
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment