Commit de62986b by Jörg Stucke

added fix for newer yara versions incombination with compiled rules + refactoring

parent 2dd5ad58
from .common import get_yara_version
from .yara_scan import scan
from .yara_compile import compile_rules
from .yara_interpretation import get_all_matched_strings
......@@ -5,5 +6,6 @@ from .yara_interpretation import get_all_matched_strings
__all__ = [
'scan',
'compile_rules',
'get_all_matched_strings'
]
'get_all_matched_strings',
'get_yara_version',
]
def convert_external_variables(ext_var_dict):
output = []
for ext_var in ext_var_dict:
output.append('-d {}={}'.format(ext_var, ext_var_dict[ext_var]))
return " ".join(sorted(output))
import logging
from distutils.version import LooseVersion
from shlex import split
from subprocess import check_output
from typing import Any, Dict, Optional
def convert_external_variables(ext_var_dict: Dict[str, Any]) -> str:
output = [f'-d {variable}={value}' for variable, value in ext_var_dict.items()]
return ' '.join(sorted(output))
def get_yara_version() -> Optional[LooseVersion]:
'''
Returns the YARA version as `distutils.version.LooseVersion` or None if YARA is not found.
:return: The installed YARA version or `None`
'''
try:
return LooseVersion(check_output(split('yara --version')).decode().strip())
except FileNotFoundError:
logging.warning('YARA not found. Is YARA installed?', exc_info=True)
return None
from common_helper_files.fail_safe_file_operations import get_files_in_dir
from pathlib import Path
from shlex import split
from typing import Dict, Optional, Any, Union
import logging
import subprocess
from tempfile import NamedTemporaryFile
......@@ -6,34 +9,36 @@ from tempfile import NamedTemporaryFile
from .common import convert_external_variables
def compile_rules(input_dir, out_file, external_variables={}):
def compile_rules(
input_dir: Union[str, Path],
out_file: Union[str, Path],
external_variables: Optional[Dict[str, Any]] = None,
):
'''
compile yara files in input dir
:param input_dir: directory with yara rules
:type input_dir: string
:param out_file: path to store the compiled yara rules
:type out_file: string
:return: None
:param external_variables: define external variables
'''
if external_variables is None:
external_variables = {}
with NamedTemporaryFile(mode='w') as tmp_file:
_create_joint_signature_file(input_dir, tmp_file)
_create_joint_signature_file(Path(input_dir), tmp_file)
_create_compiled_signature_file(out_file, tmp_file, external_variables)
return None
def _create_joint_signature_file(directory, tmp_file):
all_signatures = list()
for signature_file in sorted(get_files_in_dir(directory)):
with open(signature_file, 'rb') as fd:
all_signatures.append(fd.read())
with open(tmp_file.name, 'wb') as fd:
fd.write(b'\x0a'.join(all_signatures))
def _create_joint_signature_file(directory: Path, tmp_file: NamedTemporaryFile):
all_signatures = [
signature_file.read_bytes()
for signature_file in directory.iterdir()
]
Path(tmp_file.name).write_bytes(b'\n'.join(all_signatures))
def _create_compiled_signature_file(out_file, tmp_file, external_variables):
def _create_compiled_signature_file(out_file: Path, tmp_file: NamedTemporaryFile, external_variables: dict):
variables = convert_external_variables(external_variables)
try:
subprocess.run('yarac {} {} {}'.format(variables, tmp_file.name, out_file), shell=True, check=True)
subprocess.run(split(f'yarac {variables} {tmp_file.name} {out_file}'), check=True)
except subprocess.CalledProcessError:
logging.error('Creation of {} failed !!'.format(out_file))
logging.error(f'Creation of {out_file} failed!')
def get_all_matched_strings(yara_result_dict):
from typing import Set
def get_all_matched_strings(yara_result_dict: dict) -> Set[str]:
'''
returns a set of all matched strings
Get all strings matched by the yara rules
:param yara_result_dict: a result dict
:type yara_result_dict: dict
:return: set
:param yara_result_dict: a yara result dict
:return: a set of all matched strings
'''
matched_strings = set()
for matched_rule in yara_result_dict:
matched_strings.update(_get_matched_strings_of_single_rule(yara_result_dict[matched_rule]))
return matched_strings
return {
string
for matched_rule in yara_result_dict.values()
for string in _get_matched_strings_of_single_rule(matched_rule)
}
def _get_matched_strings_of_single_rule(yara_match):
matched_strings = set()
for string_item in yara_match['strings']:
matched_strings.add(string_item[2].decode('utf-8', 'replace'))
return matched_strings
return {
string_item[2].decode('utf-8', 'replace')
for string_item in yara_match['strings']
}
from subprocess import check_output, CalledProcessError, STDOUT
import sys
import re
import json
import logging
import re
from pathlib import Path
from subprocess import check_output, CalledProcessError, STDOUT
from typing import Optional, Any, Dict, Union
from .common import convert_external_variables
def scan(signature_path, file_path, external_variables={}, recursive=False):
def scan(
signature_path: Union[str, Path],
file_path: Union[str, Path],
external_variables: Optional[Dict[str, Any]] = None,
recursive: bool = False,
compiled: bool = False
) -> dict:
'''
Scan files and return matches
:param signature_path: path to signature file
:type signature_path: string
:param file_path: files to scan
:type file_path: string
:return: dict
:param external_variables: define external variables
:param recursive: scan recursively
:param compiled: rule is in compiled form (Yara >= 4 only!)
:return: a dict containing the scan results
'''
if external_variables is None:
external_variables = {}
variables = convert_external_variables(external_variables)
recursive = '-r' if recursive else ''
recursive_flag = '-r' if recursive else ''
compiled_flag = '-C' if compiled else ''
try:
scan_result = check_output("yara {} {} --print-meta --print-strings {} {}".format(variables, recursive, signature_path, file_path), shell=True, stderr=STDOUT)
command = f'yara {variables} {recursive_flag} {compiled_flag} -m -s {signature_path} {file_path}'
scan_result = check_output(command, shell=True, stderr=STDOUT)
return _parse_yara_output(scan_result.decode())
except CalledProcessError as e:
logging.error("There seems to be an error in the rule file:\n{}".format(e.output.decode()))
logging.error(f"There seems to be an error in the rule file:\n{e.output.decode()}")
return {}
try:
return _parse_yara_output(scan_result.decode())
except Exception as e:
logging.error('Could not parse yara result: {} {}'.format(sys.exc_info()[0].__name__, e))
logging.error(f'Could not parse yara result: {e}', exc_info=True)
return {}
......@@ -45,12 +57,12 @@ def _parse_yara_output(output):
def _split_output_in_rules_and_matches(output):
split_regex = re.compile(r'\n*.*\[.*\]\s\/.+\n*')
split_regex = re.compile(r'\n*.*\[.*]\s/.+\n*')
match_blocks = split_regex.split(output)
while '' in match_blocks:
match_blocks.remove('')
rule_regex = re.compile(r'(.*)\s\[(.*)\]\s([\.\.\/]|[\/]|[\.\/])(.+)')
rule_regex = re.compile(r'(.*)\s\[(.*)]\s([/]|[./])(.+)')
rules = rule_regex.findall(output)
assert len(match_blocks) == len(rules)
......@@ -79,8 +91,8 @@ def _parse_meta_data(meta_data_string):
for item in meta_data_string.split(','):
if '=' in item:
key, value = item.split('=', maxsplit=1)
value = json.loads(value) if value in ['true', 'false'] else value.strip('\"')
value = json.loads(value) if value in ['true', 'false'] else value.strip('"')
meta_data[key] = value
else:
logging.warning('Malformed meta string \'{}\''.format(meta_data_string))
logging.warning(f'Malformed meta string \'{meta_data_string}\'')
return meta_data
from setuptools import setup, find_packages
VERSION = "0.2.1"
VERSION = "0.3"
setup(
name="common_helper_yara",
version=VERSION,
packages=find_packages(),
install_requires=[
'common_helper_files @ git+https://github.com/fkie-cad/common_helper_files.git'
],
extras_require={
'dev': [
'pytest',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment