Commit af754bb1 by Peter Weidenbach

init

parents
__pycache__
.project
.pydevproject
*.pyc
This diff is collapsed. Click to expand it.
# Common Analysis IP and URI finder
Detects IPv4-addresses, IPv6-addresses and URIs.
## Requirements
* [YARA](https://virustotal.github.io/yara/)
\ No newline at end of file
from .ip_and_uri_finder_analysis import CommonAnalysisIPAndURIFinder
from .ip_and_uri_finder_analysis import IPFinder
from .ip_and_uri_finder_analysis import URIFinder
__all__ = [
'IPFinder',
'URIFinder',
'CommonAnalysisIPAndURIFinder',
]
analysis_class = CommonAnalysisIPAndURIFinder
from common_analysis_base import AnalysisPluginFile
from common_helper_files import get_directory_for_filename, get_version_string_from_git
import socket
from sys import exit, exc_info
import logging
import pkg_resources
try:
import yara
except ImportError:
yara = None
exit("yara not found. please install yara")
logger = logging.getLogger('CommonAnalysisIPAndURIFinder')
logger.setLevel(logging.INFO)
try:
system_version = pkg_resources.get_distribution("common_analysis_ip_and_uri_finder").version
except:
system_version = get_version_string_from_git(get_directory_for_filename(__file__))
class FinderBase:
@staticmethod
def get_strings_from_matches(matches):
# the desired strings are contained in the tuples at the third position in each yara match object
return [item[2].decode() for match in matches for item in URIFinder.eliminate_overlaps(match.strings)]
@staticmethod
def get_file_content(file_path):
with open(file_path, "rb") as fp:
return fp.read()
class IPFinder(FinderBase):
def __init__(self, yara_ip_rules):
self.ip_rules = yara.compile(yara_ip_rules)
def _find_addresses_of_specific_format(self, string, address_format, validate=True):
try:
yara_matches = self.ip_rules.match(data=string)
except Exception as e:
logging.error("Could not match yara rules: {} {}".format(exc_info()[0].__name__, e))
return []
else:
result = self.get_strings_from_matches(yara_matches)
if validate:
result = self._validate_ips(result, address_format=address_format)
return result
def find_ipv4_addresses(self, string, validate=True):
return self._find_addresses_of_specific_format(string, socket.AF_INET, validate)
def find_ipv6_addresses(self, string, validate=True):
return self._find_addresses_of_specific_format(string, socket.AF_INET6, validate)
def find_ips(self, string, validate=True):
result = set()
result.update(self.find_ipv4_addresses(string, validate=validate))
result.update(self.find_ipv6_addresses(string, validate=validate))
return list(result)
def find_ipv4_addresses_in_file(self, file_path):
file_content = self.get_file_content(file_path)
return self.find_ipv4_addresses(file_content, validate=True)
def find_ipv6_addresses_in_file(self, file_path):
file_content = self.get_file_content(file_path)
return self.find_ipv6_addresses(file_content, validate=True)
def find_ips_in_file(self, file_path, validate=True):
file_content = self.get_file_content(file_path)
return self.find_ips(file_content, validate)
@staticmethod
def _validate_ip(ip, address_format):
try:
_ = socket.inet_pton(address_format, ip)
return True
except OSError:
return False
@staticmethod
def _validate_ips(ip_list, address_format):
result = []
for ip in ip_list[:]:
if IPFinder._validate_ip(ip, address_format):
result.append(ip)
return result
class URIFinder(FinderBase):
def __init__(self, yara_uri_rules):
self.rules = yara.compile(yara_uri_rules)
def find_uris(self, uri_string):
try:
yara_matches = self.rules.match(data=uri_string)
except Exception as e:
logging.error("Could not match yara rules: {} {}".format(exc_info()[0].__name__, e))
return []
else:
result = self.get_strings_from_matches(yara_matches)
return result
def find_urls_in_file(self, file_path):
file_content = self.get_file_content(file_path)
return self.find_uris(file_content)
@staticmethod
def eliminate_overlaps(yara_match_strings):
""" yara matches contain overlaps
e.g. if the string contains 123.123.123.123
the results would be 123.123.123.123, 23.123.123.123 and 3.123.123.123
"""
result = yara_match_strings[:]
for i in range(1, len(yara_match_strings)):
# if the matches are in direct succession
if yara_match_strings[i][0] - yara_match_strings[i - 1][0] == 1:
result.remove(yara_match_strings[i])
return result
class CommonAnalysisIPAndURIFinder(AnalysisPluginFile):
def __init__(self, yara_uri_rules=None, yara_ip_rules=None):
super(CommonAnalysisIPAndURIFinder, self).__init__(system_version)
self.yara_uri_rules = yara_uri_rules
self.yara_ip_rules = yara_ip_rules
def analyze_file(self, file_path, separate_ipv6=False):
found_uris, found_ips_v4, found_ips_v6 = [], [], []
if self.yara_uri_rules:
found_uris = URIFinder(self.yara_uri_rules).find_urls_in_file(file_path)
if self.yara_ip_rules:
ip_finder = IPFinder(self.yara_ip_rules)
found_ips_v4 = ip_finder.find_ipv4_addresses_in_file(file_path)
found_ips_v6 = ip_finder.find_ipv6_addresses_in_file(file_path)
report = self.prepare_analysis_report_dictionary()
report['uris'] = found_uris
if not separate_ipv6:
report['ips'] = found_ips_v4 + found_ips_v6
else:
report['ips_v4'] = found_ips_v4
report['ips_v6'] = found_ips_v6
return report
import unittest
import os
import tempfile
import socket
from common_analysis_ip_and_uri_finder import IPFinder, URIFinder
def find_file(name, root='.'):
file_path = None
for path, dirnames, filenames in os.walk(root):
for filename in filenames:
if filename == name:
return os.path.join(path, filename)
return file_path
class TestIpAndUrlFinder(unittest.TestCase):
def setUp(self):
self.yara_uri_rules = find_file('uri_rules.yara')
self.yara_ip_rules = find_file('ip_rules.yara')
self.test_string = "1.2.3.4 abc 123.123.123.123 abc 1.2.3 .4 abc 1.1234.1.1 abc 1.a.1.1" \
"1234:1234:abcd:abcd:1234:1234:abcd:abcd 2001:db8:0:8d3:0:8a2e:70:7344"
def test_find_ips(self):
results = IPFinder(self.yara_ip_rules).find_ips(self.test_string, validate=False)
expected_results = {"1.2.3.4", "123.123.123.123", "1234:1234:abcd:abcd:1234:1234:abcd:abcd",
"2001:db8:0:8d3:0:8a2e:70:7344"}
self.assertEqual(set(results), expected_results)
def test_find_ipv4_addresses(self):
results = IPFinder(self.yara_ip_rules).find_ipv4_addresses(self.test_string, validate=True)
expected_results = {"1.2.3.4", "123.123.123.123"}
self.assertEqual(set(results), expected_results)
def test_find_ipv6_addresses(self):
results = IPFinder(self.yara_ip_rules).find_ipv6_addresses(self.test_string, validate=True)
expected_results = {"1234:1234:abcd:abcd:1234:1234:abcd:abcd", "2001:db8:0:8d3:0:8a2e:70:7344"}
self.assertEqual(set(results), expected_results)
def test_find_ips_in_file(self):
with tempfile.NamedTemporaryFile() as test_file:
test_file.write(
b"""1.2.3.4
abc
255.255.255.255
1234:1234:abcd:abcd:1234:1234:abcd:abcd"""
)
test_file.seek(0)
results = set(IPFinder(self.yara_ip_rules).find_ips_in_file(test_file.name, validate=False))
expected_results = {"1.2.3.4", "255.255.255.255", "1234:1234:abcd:abcd:1234:1234:abcd:abcd"}
self.assertEqual(results, expected_results)
def test_validate_ipv4(self):
ips = ["1.1.1.1", "1.1.1", "a.1.1.1", "1.1.1.1.1"]
valid_ips = ["1.1.1.1"]
validated_ips = IPFinder(self.yara_ip_rules)._validate_ips(ips, socket.AF_INET)
self.assertEqual(validated_ips, valid_ips)
def test_validate_ipv6(self):
ips = ["1234:1234:abcd:abcd:1234:1234:1.0.0.127", "2001:db8::8d3::", "2001:db8:0:0:8d3::"]
valid_ips = ['1234:1234:abcd:abcd:1234:1234:1.0.0.127', "2001:db8:0:0:8d3::"]
validated_ips = IPFinder(self.yara_ip_rules)._validate_ips(ips, socket.AF_INET6)
self.assertEqual(validated_ips, valid_ips)
def test_find_uri(self):
test_string = "http://www.google.de https://www.test.de/test/ " \
"ftp://ftp.is.co.za/rfc/rfc1808.txt telnet://192.0.2.16:80/"
test_result = set(URIFinder(self.yara_uri_rules).find_uris(test_string))
expected_result = {"http://www.google.de", "https://www.test.de/test/", "ftp://ftp.is.co.za/rfc/rfc1808.txt",
"telnet://192.0.2.16:80/"}
self.assertEqual(test_result, expected_result)
rule IPv4 {
strings:
$a = /[\d]{1,3}(\.[\d]{1,3}){3}/
condition:
$a
}
rule IPv6 {
strings:
$a = /([\da-fA-F]{1,4})?\:([\da-fA-F]{1,4})?\:(([\da-fA-F]{1,4})?\:){0,5}([\da-fA-F]{1,4})?/
$b = /([\da-fA-F]{1,4})?\:([\da-fA-F]{1,4})?\:(([\da-fA-F]{1,4})?\:){0,5}[\d]{1,3}(\.[\d]{1,3}){3}/
condition:
$a or $b
}
\ No newline at end of file
rule URI {
strings:
$a = /[a-zA-Z]+\:\/\/[a-zA-Z0-9_\-\/.:]+/
condition:
$a
}
\ No newline at end of file
import os
import subprocess
from setuptools import setup, find_packages
setup(
name="common_analysis_ip_and_uri_finder",
version=subprocess.check_output(['git', 'describe', '--always'], cwd=os.path.dirname(os.path.abspath(__file__))).strip().decode('utf-8'),
packages=find_packages(),
install_requires=[
'common_analysis_base',
'common_helper_files',
'yara-python >= 3.5'
],
dependency_links=[
'git+https://github.com/mass-project/common_helper_files.git#common_helper_files',
'git+https://github.com/mass-project/common_analysis_base.git'
],
description="Analysis module to find IPs und URIs",
author="Fraunhofer FKIE, University of Bonn Institute of Computer Science 4",
license="GPL-3.0"
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment