Unverified Commit 69790b89 by Jörg Stucke Committed by GitHub

Merge pull request #4 from fkie-cad/yara-3.9-support

Yara 3.9 support
parents 2dd5ad58 5bdaf804
from .common import get_yara_version
from .yara_scan import scan
from .yara_compile import compile_rules
from .yara_interpretation import get_all_matched_strings
......@@ -5,5 +6,6 @@ from .yara_interpretation import get_all_matched_strings
__all__ = [
'scan',
'compile_rules',
'get_all_matched_strings'
]
'get_all_matched_strings',
'get_yara_version',
]
def convert_external_variables(ext_var_dict):
output = []
for ext_var in ext_var_dict:
output.append('-d {}={}'.format(ext_var, ext_var_dict[ext_var]))
return " ".join(sorted(output))
import logging
from distutils.version import LooseVersion
from shlex import split
from subprocess import check_output
from typing import Any, Dict, Optional
def convert_external_variables(ext_var_dict: Dict[str, Any]) -> str:
output = [f'-d {variable}={value}' for variable, value in ext_var_dict.items()]
return ' '.join(sorted(output))
def get_yara_version() -> Optional[LooseVersion]:
'''
Returns the YARA version as `distutils.version.LooseVersion` or None if YARA is not found.
:return: The installed YARA version or `None`
'''
try:
return LooseVersion(check_output(split('yara --version')).decode().strip())
except FileNotFoundError:
logging.warning('YARA not found. Is YARA installed?', exc_info=True)
return None
from common_helper_files.fail_safe_file_operations import get_files_in_dir
from pathlib import Path
from shlex import split
from typing import Dict, Optional, Any, Union
import logging
import subprocess
from tempfile import NamedTemporaryFile
......@@ -6,34 +9,36 @@ from tempfile import NamedTemporaryFile
from .common import convert_external_variables
def compile_rules(input_dir, out_file, external_variables={}):
def compile_rules(
input_dir: Union[str, Path],
out_file: Union[str, Path],
external_variables: Optional[Dict[str, Any]] = None,
):
'''
compile yara files in input dir
:param input_dir: directory with yara rules
:type input_dir: string
:param out_file: path to store the compiled yara rules
:type out_file: string
:return: None
:param external_variables: define external variables
'''
if external_variables is None:
external_variables = {}
with NamedTemporaryFile(mode='w') as tmp_file:
_create_joint_signature_file(input_dir, tmp_file)
_create_joint_signature_file(Path(input_dir), tmp_file)
_create_compiled_signature_file(out_file, tmp_file, external_variables)
return None
def _create_joint_signature_file(directory, tmp_file):
all_signatures = list()
for signature_file in sorted(get_files_in_dir(directory)):
with open(signature_file, 'rb') as fd:
all_signatures.append(fd.read())
with open(tmp_file.name, 'wb') as fd:
fd.write(b'\x0a'.join(all_signatures))
def _create_joint_signature_file(directory: Path, tmp_file: NamedTemporaryFile):
all_signatures = [
signature_file.read_bytes()
for signature_file in sorted(directory.iterdir())
]
Path(tmp_file.name).write_bytes(b'\n'.join(all_signatures))
def _create_compiled_signature_file(out_file, tmp_file, external_variables):
def _create_compiled_signature_file(out_file: Path, tmp_file: NamedTemporaryFile, external_variables: dict):
variables = convert_external_variables(external_variables)
try:
subprocess.run('yarac {} {} {}'.format(variables, tmp_file.name, out_file), shell=True, check=True)
subprocess.run(split(f'yarac {variables} {tmp_file.name} {out_file}'), check=True)
except subprocess.CalledProcessError:
logging.error('Creation of {} failed !!'.format(out_file))
logging.error(f'Creation of {out_file} failed!')
def get_all_matched_strings(yara_result_dict):
from typing import Set
def get_all_matched_strings(yara_result_dict: dict) -> Set[str]:
'''
returns a set of all matched strings
Get all strings matched by the yara rules
:param yara_result_dict: a result dict
:type yara_result_dict: dict
:return: set
:param yara_result_dict: a yara result dict
:return: a set of all matched strings
'''
matched_strings = set()
for matched_rule in yara_result_dict:
matched_strings.update(_get_matched_strings_of_single_rule(yara_result_dict[matched_rule]))
return matched_strings
return {
string
for matched_rule in yara_result_dict.values()
for string in _get_matched_strings_of_single_rule(matched_rule)
}
def _get_matched_strings_of_single_rule(yara_match):
matched_strings = set()
for string_item in yara_match['strings']:
matched_strings.add(string_item[2].decode('utf-8', 'replace'))
return matched_strings
return {
string_item[2].decode('utf-8', 'replace')
for string_item in yara_match['strings']
}
from subprocess import check_output, CalledProcessError, STDOUT
import sys
import re
import json
import logging
import re
from pathlib import Path
from subprocess import check_output, CalledProcessError, STDOUT
from typing import Optional, Any, Dict, Union
from .common import convert_external_variables
def scan(signature_path, file_path, external_variables={}, recursive=False):
def scan(
signature_path: Union[str, Path],
file_path: Union[str, Path],
external_variables: Optional[Dict[str, Any]] = None,
recursive: bool = False,
compiled: bool = False
) -> dict:
'''
Scan files and return matches
:param signature_path: path to signature file
:type signature_path: string
:param file_path: files to scan
:type file_path: string
:return: dict
:param external_variables: define external variables
:param recursive: scan recursively
:param compiled: rule is in compiled form (Yara >= 4 only!)
:return: a dict containing the scan results
'''
if external_variables is None:
external_variables = {}
variables = convert_external_variables(external_variables)
recursive = '-r' if recursive else ''
recursive_flag = '-r' if recursive else ''
compiled_flag = '-C' if compiled else ''
try:
scan_result = check_output("yara {} {} --print-meta --print-strings {} {}".format(variables, recursive, signature_path, file_path), shell=True, stderr=STDOUT)
command = f'yara {variables} {recursive_flag} {compiled_flag} -m -s {signature_path} {file_path}'
scan_result = check_output(command, shell=True, stderr=STDOUT)
return _parse_yara_output(scan_result.decode())
except CalledProcessError as e:
logging.error("There seems to be an error in the rule file:\n{}".format(e.output.decode()))
logging.error(f'There seems to be an error in the rule file:\n{e.output.decode()}', exc_info=True)
return {}
try:
return _parse_yara_output(scan_result.decode())
except Exception as e:
logging.error('Could not parse yara result: {} {}'.format(sys.exc_info()[0].__name__, e))
logging.error(f'Could not parse yara result: {e}', exc_info=True)
return {}
......@@ -45,12 +57,12 @@ def _parse_yara_output(output):
def _split_output_in_rules_and_matches(output):
split_regex = re.compile(r'\n*.*\[.*\]\s\/.+\n*')
split_regex = re.compile(r'\n*.*\[.*]\s/.+\n*')
match_blocks = split_regex.split(output)
while '' in match_blocks:
match_blocks.remove('')
rule_regex = re.compile(r'(.*)\s\[(.*)\]\s([\.\.\/]|[\/]|[\.\/])(.+)')
rule_regex = re.compile(r'(.*)\s\[(.*)]\s(?=/|./|../)(.+)')
rules = rule_regex.findall(output)
assert len(match_blocks) == len(rules)
......@@ -58,17 +70,15 @@ def _split_output_in_rules_and_matches(output):
def _append_match_to_result(match, resulting_matches, rule):
assert len(rule) == 4
rule_name, meta_string, _, _ = rule
assert len(match) == 4
assert len(rule) == 3, f'rule was parsed incorrectly: {rule}'
rule_name, meta_string, _ = rule
assert len(match) == 4, f'match was parsed incorrectly: {match}'
_, offset, matched_tag, matched_string = match
meta_dict = _parse_meta_data(meta_string)
this_match = resulting_matches[rule_name] if rule_name in resulting_matches else dict(rule=rule_name, matches=True, strings=list(), meta=meta_dict)
this_match = resulting_matches.setdefault(rule_name, dict(rule=rule_name, matches=True, strings=[], meta=meta_dict))
this_match['strings'].append((int(offset, 16), matched_tag, matched_string.encode()))
resulting_matches[rule_name] = this_match
def _parse_meta_data(meta_data_string):
......@@ -79,8 +89,8 @@ def _parse_meta_data(meta_data_string):
for item in meta_data_string.split(','):
if '=' in item:
key, value = item.split('=', maxsplit=1)
value = json.loads(value) if value in ['true', 'false'] else value.strip('\"')
value = json.loads(value) if value in ['true', 'false'] else value.strip('"')
meta_data[key] = value
else:
logging.warning('Malformed meta string \'{}\''.format(meta_data_string))
logging.warning(f'Malformed meta string \'{meta_data_string}\'')
return meta_data
from setuptools import setup, find_packages
VERSION = "0.2.1"
VERSION = "0.3"
setup(
name="common_helper_yara",
version=VERSION,
packages=find_packages(),
install_requires=[
'common_helper_files @ git+https://github.com/fkie-cad/common_helper_files.git'
],
extras_require={
'dev': [
'pytest',
......
import unittest
from distutils.version import LooseVersion
from common_helper_yara.common import convert_external_variables
import pytest
import common_helper_yara.common as common
from common_helper_yara.common import convert_external_variables, get_yara_version
class TestYaraCommon(unittest.TestCase):
def test_convert_external_variables(self):
self.assertEqual(convert_external_variables({'a': 'b'}), '-d a=b', 'converted output not correct')
self.assertEqual(convert_external_variables({'a': 1, 'b': 'c'}), '-d a=1 -d b=c', 'converted output not correct')
@pytest.mark.parametrize('test_input, expected_output', [
({'a': 'b'}, '-d a=b'),
({'a': 1, 'b': 'c'}, '-d a=1 -d b=c'),
])
def test_convert_external_variables(test_input, expected_output):
assert convert_external_variables(test_input) == expected_output
def test_get_yara_version():
assert LooseVersion('3.0') < get_yara_version() < LooseVersion('5.0')
@pytest.fixture()
def yara_not_found(monkeypatch):
def raise_error(_):
raise FileNotFoundError
monkeypatch.setattr(common, 'check_output', raise_error)
def test_get_yara_version_error(yara_not_found):
assert get_yara_version() is None
import os
import unittest
from common_helper_yara.yara_compile import compile_rules
from common_helper_yara.yara_scan import scan
from distutils.version import LooseVersion
from pathlib import Path
from tempfile import TemporaryDirectory
from common_helper_yara.common import get_yara_version
from common_helper_yara.yara_compile import compile_rules
from common_helper_yara.yara_scan import scan
DIR_OF_CURRENT_FILE = os.path.dirname(os.path.abspath(__file__))
DIR_OF_CURRENT_FILE = Path(__file__).parent
COMPILED_FLAG = get_yara_version() >= LooseVersion('3.9')
class TestYaraCompile(unittest.TestCase):
def test_compile_and_scan(self):
tmp_dir = TemporaryDirectory(prefix="common_helper_yara_test_")
input_dir = os.path.join(DIR_OF_CURRENT_FILE, 'data/rules')
signature_file = os.path.join(tmp_dir.name, 'test.yc')
data_files = os.path.join(DIR_OF_CURRENT_FILE, 'data/data_files')
def test_compile_and_scan():
with TemporaryDirectory(prefix="common_helper_yara_test_") as tmp_dir:
input_dir = DIR_OF_CURRENT_FILE / 'data/rules'
signature_file = Path(tmp_dir) / 'test.yc'
data_files = DIR_OF_CURRENT_FILE / 'data/data_files'
compile_rules(input_dir, signature_file, external_variables={'test_flag': 'true'})
self.assertTrue(os.path.exists(signature_file), "file not created")
assert signature_file.exists(), "file not created"
result = scan(signature_file, data_files, recursive=True)
self.assertIn('lighttpd', result.keys(), "at least one match missing")
self.assertIn('lighttpd_simple', result.keys(), "at least one match missing")
result = scan(signature_file, data_files, recursive=True, compiled=COMPILED_FLAG)
assert 'lighttpd' in result.keys(), "at least one match missing"
assert 'lighttpd_simple' in result.keys(), "at least one match missing"
import unittest
from common_helper_yara.yara_interpretation import get_all_matched_strings
TEST_DATA = {
'test_rule': {
'rule': 'test_rule', 'meta': {},
'strings': [(0, '$a', b'test_1'), (10, '$b', b'test_2')],
'matches': True
},
'test_rule2': {
'rule': 'test_rule2',
'meta': {},
'strings': [(0, '$a', b'test_1'), (10, '$b', b'test_3')], 'matches': True
},
}
class TestYaraInterpretation(unittest.TestCase):
def test_get_all_matched_strings(self):
test_data = {
'test_rule': {'rule': 'test_rule', 'meta': {}, 'strings': [(0, '$a', b'test_1'), (10, '$b', b'test_2')], 'matches': True},
'test_rule2': {'rule': 'test_rule2', 'meta': {}, 'strings': [(0, '$a', b'test_1'), (10, '$b', b'test_3')], 'matches': True},
}
result = get_all_matched_strings(test_data)
self.assertEqual(result, set(['test_1', 'test_2', 'test_3']), "resulting strings not correct")
def test_get_all_matched_strings():
assert get_all_matched_strings(TEST_DATA) == {'test_1', 'test_2', 'test_3'}, "resulting strings not correct"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment