Commit 91cce1f0 by Peter Weidenbach

compile functionality added

parent e4a84d2c
# Common Helper Yara # Common Helper Yara
Yara command line binding Yara command line binding providing scan and compile features.
## Requirements ## Requirements
* [YARA >= 3.6](](https://virustotal.github.io/yara/) * [YARA >= 3.6](https://virustotal.github.io/yara/)
def convert_external_variables(ext_var_dict):
output = []
for ext_var in ext_var_dict:
output.append('-d {}={}'.format(ext_var, ext_var_dict[ext_var]))
return " ".join(sorted(output))
from common_helper_files.fail_safe_file_operations import get_files_in_dir
import logging
import subprocess
from tempfile import NamedTemporaryFile
from .common import convert_external_variables
def compile_rules(input_dir, out_file, external_variables={}):
'''
compile yara files in input dir
:param input_dir: directory with yara rules
:type input_dir: string
:param out_file: path to store the compiled yara rules
:type out_file: string
:return: None
'''
with NamedTemporaryFile(mode='w') as tmp_file:
_create_joint_signature_file(input_dir, tmp_file)
_create_compiled_signature_file(out_file, tmp_file, external_variables)
return None
def _create_joint_signature_file(directory, tmp_file):
all_signatures = list()
for signature_file in get_files_in_dir(directory):
with open(signature_file, 'rb') as fd:
all_signatures.append(fd.read())
with open(tmp_file.name, 'wb') as fd:
fd.write(b'\x0a'.join(all_signatures))
def _create_compiled_signature_file(out_file, tmp_file, external_variables):
variables = convert_external_variables(external_variables)
try:
subprocess.run('yarac {} {} {}'.format(variables, tmp_file.name, out_file), shell=True, check=True)
except subprocess.CalledProcessError:
logging.error('Creation of {} failed !!'.format(out_file))
import subprocess from subprocess import check_output, CalledProcessError, STDOUT
import sys import sys
import re import re
import json import json
import logging import logging
from .common import convert_external_variables
def scan(signature_path, file_path, external_variables={}):
def scan(signature_path, file_path, external_variables={}, recursive=False):
''' '''
Scan files and return matches Scan files and return matches
...@@ -15,23 +17,20 @@ def scan(signature_path, file_path, external_variables={}): ...@@ -15,23 +17,20 @@ def scan(signature_path, file_path, external_variables={}):
:type file_path: string :type file_path: string
:return: dict :return: dict
''' '''
variables = _convert_external_variables(external_variables) variables = convert_external_variables(external_variables)
with subprocess.Popen('yara {} --print-meta --print-strings {} {}'.format(variables, signature_path, file_path), shell=True, stdout=subprocess.PIPE) as process: recursive = '-r' if recursive else ''
output = process.stdout.read().decode() try:
scan_result = check_output("yara {} {} --print-meta --print-strings {} {}".format(variables, recursive, signature_path, file_path), shell=True, stderr=STDOUT)
except CalledProcessError as e:
logging.error("There seems to be an error in the rule file:\n{}".format(e.output.decode()))
return {}
try: try:
return _parse_yara_output(output) return _parse_yara_output(scan_result.decode())
except Exception as e: except Exception as e:
logging.error('Could not parse yara result: {} {}'.format(sys.exc_info()[0].__name__, e)) logging.error('Could not parse yara result: {} {}'.format(sys.exc_info()[0].__name__, e))
return {} return {}
def _convert_external_variables(ext_var_dict):
output = []
for ext_var in ext_var_dict:
output.append('-d {}={}'.format(ext_var, ext_var_dict[ext_var]))
return " ".join(sorted(output))
def _parse_yara_output(output): def _parse_yara_output(output):
resulting_matches = dict() resulting_matches = dict()
......
from setuptools import setup, find_packages from setuptools import setup, find_packages
VERSION = "0.1" VERSION = "0.2"
setup( setup(
name="common_helper_yara", name="common_helper_yara",
version=VERSION, version=VERSION,
packages=find_packages(), packages=find_packages(),
install_requires=[ install_requires=[
'yara-python >= 3.6' 'common_helper_files'
],
dependency_links=[
'git+ssh://git@atlassian-net.cs.uni-bonn.de:7999/ch/common_helper_files.git#common_helper_files'
], ],
description="Yara command line binding", description="Yara command line binding",
author="Fraunhofer FKIE", author="Fraunhofer FKIE",
......
rule lighttpd rule lighttpd_simple
{ {
meta: meta:
software_name = "lighttpd" software_name = "lighttpd"
......
import unittest
from common_helper_yara.common import convert_external_variables
class TestYaraCommon(unittest.TestCase):
def test_convert_external_variables(self):
self.assertEqual(convert_external_variables({'a': 'b'}), '-d a=b', 'converted output not correct')
self.assertEqual(convert_external_variables({'a': 1, 'b': 'c'}), '-d a=1 -d b=c', 'converted output not correct')
import os
import unittest
from common_helper_yara.yara_compile import compile_rules
from common_helper_yara.yara_scan import scan
from tempfile import TemporaryDirectory
DIR_OF_CURRENT_FILE = os.path.dirname(os.path.abspath(__file__))
class TestYaraCompile(unittest.TestCase):
def test_compile_and_scan(self):
tmp_dir = TemporaryDirectory(prefix="common_helper_yara_test_")
input_dir = os.path.join(DIR_OF_CURRENT_FILE, 'data/rules')
signature_file = os.path.join(tmp_dir.name, 'test.yc')
data_files = os.path.join(DIR_OF_CURRENT_FILE, 'data/data_files')
compile_rules(input_dir, signature_file, external_variables={'test_flag': 'true'})
self.assertTrue(os.path.exists(signature_file), "file not created")
result = scan(signature_file, data_files, recursive=True)
self.assertIn('lighttpd', result.keys(), "at least one match missing")
self.assertIn('lighttpd_simple', result.keys(), "at least one match missing")
import os import os
import unittest import unittest
from common_helper_yara.yara_scan import _parse_yara_output, _convert_external_variables, scan from common_helper_yara.yara_scan import _parse_yara_output, scan
DIR_OF_CURRENT_FILE = os.path.dirname(os.path.abspath(__file__)) DIR_OF_CURRENT_FILE = os.path.dirname(os.path.abspath(__file__))
...@@ -18,13 +17,9 @@ class TestYaraScan(unittest.TestCase): ...@@ -18,13 +17,9 @@ class TestYaraScan(unittest.TestCase):
self.assertIn('PgpPublicKeyBlock', matches.keys(), 'Pgp block should have been matched') self.assertIn('PgpPublicKeyBlock', matches.keys(), 'Pgp block should have been matched')
self.assertIn(0, matches['PgpPublicKeyBlock']['strings'][0], 'first block should start at 0x0') self.assertIn(0, matches['PgpPublicKeyBlock']['strings'][0], 'first block should start at 0x0')
def test_convert_external_variables(self):
self.assertEqual(_convert_external_variables({'a': 'b'}), '-d a=b', 'converted output not correct')
self.assertEqual(_convert_external_variables({'a': 1, 'b': 'c'}), '-d a=1 -d b=c', 'converted output not correct')
def test_scan(self): def test_scan(self):
signature_file = os.path.join(DIR_OF_CURRENT_FILE, 'data', 'signatures.yara') signature_file = os.path.join(DIR_OF_CURRENT_FILE, 'data/rules', 'signatures.yara')
scan_file = os.path.join(DIR_OF_CURRENT_FILE, 'data', 'scan_file') scan_file = os.path.join(DIR_OF_CURRENT_FILE, 'data/data_files', 'scan_file')
result = scan(signature_file, scan_file) result = scan(signature_file, scan_file)
self.assertIsInstance(result, dict, "result is not a dict") self.assertIsInstance(result, dict, "result is not a dict")
...@@ -32,11 +27,18 @@ class TestYaraScan(unittest.TestCase): ...@@ -32,11 +27,18 @@ class TestYaraScan(unittest.TestCase):
self.assertEqual(result['another_test_rule']['meta']['description'], 'test rule', 'meta data not correct') self.assertEqual(result['another_test_rule']['meta']['description'], 'test rule', 'meta data not correct')
def test_scan_ext_variable_and_magic(self): def test_scan_ext_variable_and_magic(self):
signature_file = os.path.join(DIR_OF_CURRENT_FILE, 'data', 'signatures_ext_var.yara') signature_file = os.path.join(DIR_OF_CURRENT_FILE, 'data/rules', 'signatures_ext_var.yara')
scan_file = os.path.join(DIR_OF_CURRENT_FILE, 'data', 'scan_file') scan_file = os.path.join(DIR_OF_CURRENT_FILE, 'data/data_files', 'scan_file')
result = scan(signature_file, scan_file, external_variables={'test_flag': "true"}) result = scan(signature_file, scan_file, external_variables={'test_flag': "true"})
self.assertEqual(len(result), 1, "number of results not correct") self.assertEqual(len(result), 1, "number of results not correct")
result = scan(signature_file, scan_file, external_variables={'test_flag': "false"}) result = scan(signature_file, scan_file, external_variables={'test_flag': "false"})
self.assertEqual(len(result), 0, "number of results not correct") self.assertEqual(len(result), 0, "number of results not correct")
def test_scan_recursive(self):
signature_file = os.path.join(DIR_OF_CURRENT_FILE, 'data/rules', 'signatures.yara')
scan_file = os.path.join(DIR_OF_CURRENT_FILE, 'data/data_files')
result = scan(signature_file, scan_file, recursive=True)
self.assertEqual(len(result['another_test_rule']['strings']), 2, 'string in second file not found')
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment