Commit 6c36ec68 by Peter Weidenbach

init

parents
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>common_analysis_oms</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 3.0</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">python3.4</pydev_property>
</pydev_project>
# Offline Maleware Scanner (OMS) Common Analysis Module
This module scans a file with several malware scanners installed locally.
This Plug-in is an adaption of: https://atlassian-net.cs.uni-bonn.de/stash/projects/MAL/repos/offline_malware_scanner/browse
from .oms import CommonAnalysisOMS
__all__ = [
'CommonAnalysisOMS'
]
analysis_class = CommonAnalysisOMS
# -*- coding: utf-8 -*-
from common_analysis_base import AnalysisPluginFile
from common_helper_files import get_directory_for_filename, get_version_string_from_git
import hashlib
import json
from re import findall
from subprocess import Popen, PIPE
from time import time
from os import listdir, path
from distutils import spawn
import logging
system_version = get_version_string_from_git(get_directory_for_filename(__file__))
class CommonAnalysisOMS(AnalysisPluginFile):
av_list = []
BASE_DIR = path.dirname(path.abspath(__file__))
PLUGIN_DIR = path.join(BASE_DIR, "plugins")
def __init__(self):
super(CommonAnalysisOMS, self).__init__(system_version)
self.load_plugins()
self.result_dict = {}
def analyze_file(self, file_path):
self.result_dict = self.prepare_analysis_report_dictionary()
self.scan_file(file_path)
return self.result_dict
def get_av_scan_result(self, av, filepath):
scanprocess = Popen(av["command"].replace("$filepath", filepath), shell=True, stdout=PIPE)
scanresult = scanprocess.stdout.read().decode("utf-8", errors='ignore')
logging.debug(scanresult)
return scanresult
@staticmethod
def find_malware_name(scanresult, av):
try:
return ", ".join(findall(av["re_malware_name"], scanresult))
except IndexError:
# if the result is empty, there is an error with the RE
logging.error("error with malware name regular" + "expression for {}".format(av["name"]))
return ""
def parse_scan_result(self, scanresult, av):
infection_indicator = findall(av["re_infected"], scanresult)
logging.debug("indicator: {}".format(infection_indicator))
# if there is an infected file:
if infection_indicator not in [["0"], []]:
finding = self.find_malware_name(scanresult, av)
self.result_dict["positives"] += 1
else:
finding = "clean"
print("result: " + finding)
return {"result": finding,
"detected": finding != "clean",
"version": self.get_av_scan_result(av, "--version")}
def remove_not_installed_avs(self):
for av in self.av_list[:]:
program = av["command"].split(" ")[0]
if not spawn.find_executable(program):
self.av_list.remove(av)
def execute_scans(self, filepath):
result = {}
for av in self.av_list:
print("Starting scan with {} ({}/{})".format(av["name"],
self.av_list.index(av) + 1, self.result_dict["number_of_scanners"]))
scanresult = self.get_av_scan_result(av, repr(path.abspath(filepath)))
logging.debug(repr(scanresult))
result[av["name"]] = self.parse_scan_result(scanresult, av)
return result
def load_plugins(self):
self.av_list = []
plugin_files = [f for f in listdir(self.PLUGIN_DIR)
if f[-4:] == "json"]
for f in plugin_files:
with open(path.join(self.PLUGIN_DIR, f), "r") as fp:
self.av_list.append(json.load(fp))
self.remove_not_installed_avs()
@staticmethod
def load_file_content(filepath):
with open(filepath, "rb") as fp:
return fp.read()
def get_md5(self, filepath):
m = hashlib.md5()
m.update(self.load_file_content(filepath))
return m.hexdigest()
def scan_file(self, file_to_analyze):
self.result_dict
self.result_dict["positives"] = 0
self.result_dict["scan_date"] = time()
self.result_dict["md5"] = self.get_md5(file_to_analyze)
self.result_dict["scanners"] = [av["name"] for av in self.av_list]
self.result_dict["number_of_scanners"] = len(self.result_dict["scanners"])
self.result_dict["scans"] = self.execute_scans(file_to_analyze)
logging.debug(json.dumps(self.result_dict, indent=2))
return self.result_dict
import os
import subprocess
from setuptools import setup, find_packages
setup(
name="common_analysis_oms",
version=subprocess.check_output(['git', 'describe', '--always'], cwd=os.path.dirname(os.path.abspath(__file__))).strip().decode('utf-8'),
packages=find_packages(),
install_requires=[
'common_analysis_base',
'common_helper_files'
]
)
'''
Created on Mar 24, 2016
@author: weidenba
'''
import unittest
class Test(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def testName(self):
pass
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment