Commit 55fa0b1f by fwkz

Merge branch 'allonhadaya-module-tests'

parents 2e2f1646 17857b69
......@@ -11,6 +11,7 @@ from routersploit.utils import (
boolify,
mute,
multi,
index_modules,
)
from routersploit import exploits
......
......@@ -3,13 +3,9 @@ import os
import sys
import traceback
import atexit
import importlib
import inspect
from routersploit.exceptions import RoutersploitException
from routersploit.exploits import Exploit
from routersploit import utils
from routersploit import modules as rsf_modules
if sys.platform == "darwin":
import gnureadline as readline
......@@ -158,13 +154,11 @@ class RoutersploitInterpreter(BaseInterpreter):
self.raw_prompt_template = None
self.module_prompt_template = None
self.prompt_hostname = 'rsf'
self.modules_directory = rsf_modules.__path__[0]
self.modules = []
self.modules_with_errors = {}
self.main_modules_dirs = []
self.modules = utils.index_modules()
self.main_modules_dirs = [module for module in os.listdir(utils.MODULES_DIR) if not module.startswith("__")]
self.__parse_prompt()
self.load_modules()
self.banner = """ ______ _ _____ _ _ _
| ___ \ | | / ___| | | (_) |
......@@ -182,28 +176,6 @@ class RoutersploitInterpreter(BaseInterpreter):
Total module count: {modules_count}
""".format(modules_count=len(self.modules))
def load_modules(self):
self.main_modules_dirs = [module for module in os.listdir(self.modules_directory) if not module.startswith("__")]
self.modules = []
self.modules_with_errors = {}
for root, dirs, files in os.walk(self.modules_directory):
_, package, root = root.rpartition('routersploit')
root = "".join((package, root)).replace(os.sep, '.')
modules = map(lambda x: '.'.join((root, os.path.splitext(x)[0])), filter(lambda x: x.endswith('.py'), files))
for module_path in modules:
try:
module = importlib.import_module(module_path)
except ImportError as error:
self.modules_with_errors[module_path] = error
else:
klasses = inspect.getmembers(module, inspect.isclass)
exploits = filter(lambda x: issubclass(x[1], Exploit), klasses)
# exploits = map(lambda x: '.'.join([module_path.split('.', 2).pop(), x[0]]), exploits)
# self.modules.extend(exploits)
if exploits:
self.modules.append(module_path.split('.', 2).pop())
def __parse_prompt(self):
raw_prompt_default_template = "\001\033[4m\002{host}\001\033[0m\002 > "
raw_prompt_template = os.getenv("RSF_RAW_PROMPT", raw_prompt_default_template).replace('\\033', '\033')
......@@ -259,9 +231,9 @@ class RoutersploitInterpreter(BaseInterpreter):
:return: list of most accurate command suggestions
"""
if self.current_module:
return ['run', 'back', 'set ', 'show ', 'check', 'debug', 'exit']
return ['run', 'back', 'set ', 'show ', 'check', 'exit']
else:
return ['use ', 'debug', 'exit']
return ['use ', 'exit']
def command_back(self, *args, **kwargs):
self.current_module = None
......@@ -271,12 +243,9 @@ class RoutersploitInterpreter(BaseInterpreter):
module_path = '.'.join(('routersploit', 'modules', module_path))
# module_path, _, exploit_name = module_path.rpartition('.')
try:
module = importlib.import_module(module_path)
self.current_module = getattr(module, 'Exploit')()
except (ImportError, AttributeError, KeyError):
utils.print_error("Error during loading '{}' module. "
"It should be valid path to the module. "
"Use <tab> key multiple times for completion.".format(utils.humanize_path(module_path)))
self.current_module = utils.import_exploit(module_path)()
except RoutersploitException as err:
utils.print_error(err.message)
@utils.stop_after(2)
def complete_use(self, text, *args, **kwargs):
......@@ -392,10 +361,5 @@ class RoutersploitInterpreter(BaseInterpreter):
else:
utils.print_status("Target could not be verified")
def command_debug(self, *args, **kwargs):
for key, value in self.modules_with_errors.iteritems():
utils.print_info(key)
utils.print_error(value, '\n')
def command_exit(self, *args, **kwargs):
raise KeyboardInterrupt
......@@ -23,9 +23,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'FTP Bruteforce',
'authors': [
'description': 'Module performs bruteforce attack against FTP service.'
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -22,9 +22,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'FTP Default Creds',
'authors': [
'description': 'Module perform dictionary attack with default credentials against FTP service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -23,9 +23,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'HTTP Basic Bruteforce',
'authors': [
'description': 'Module performs bruteforce attack against HTTP Basic Auth service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -22,9 +22,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'HTTP Basic Default Creds',
'authors': [
'description': 'Module perform dictionary attack with default credentials against HTTP Basic Auth service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -24,9 +24,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'HTTP Form Bruteforce',
'authors': [
'description': 'Module performs bruteforce attack against HTTP form service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -23,9 +23,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'HTTP Form Default Creds',
'authors': [
'description': 'Module performs dictionary attack with default credentials against HTTP form service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -21,7 +21,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'SNMP Bruteforce',
'authors': 'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
'description': 'Module performs bruteforce attack against SNMP service. '
'If valid community string is found, it is displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>', # routersploit module
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -23,7 +23,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'SSH Bruteforce',
'authors': 'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
'description': 'Module performs bruteforce attack against SSH service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>', # routersploit module
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -22,9 +22,14 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'SSH Default Creds',
'authors': [
'description': 'Module perform dictionary attack with default credentials against SSH service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -22,7 +22,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'Telnet Bruteforce',
'authors': 'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
'description': 'Module performs bruteforce attack against Telnet service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>', # routersploit module
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -21,9 +21,13 @@ class Exploit(exploits.Exploit):
"""
__info__ = {
'name': 'Telnet Default Creds',
'authors': [
'description': 'Module perform dictionary attack with default credentials against Telnet service. '
'If valid credentials are found, they are displayed to the user.',
'authors': (
'Marcin Bury <marcin.bury[at]reverse-shell.com>' # routersploit module
]
),
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address or file with target:port (file://)')
......
......@@ -20,6 +20,8 @@ class Exploit(exploits.Exploit):
'authors': [
'Marcin Bury <marcin.bury[at]reverse-shell.com>', # routersploit module
],
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address e.g. 192.168.1.1') # target address
......
......@@ -20,6 +20,8 @@ class Exploit(exploits.Exploit):
'authors': [
'Marcin Bury <marcin.bury[at]reverse-shell.com>', # routersploit module
],
'references': '',
'devices': 'Multi',
}
target = exploits.Option('', 'Target IP address e.g. 192.168.1.1') # target address
......
......@@ -30,7 +30,7 @@ class RoutersploitCompleterTest(unittest.TestCase):
def test_raw_commands_no_module(self):
self.rsf.send("\t\t")
self.assertPrompt('debug exit use \r\n', self.raw_prompt)
self.assertPrompt('exit use \r\n', self.raw_prompt)
def test_complete_use_raw(self):
self.rsf.send("u\t\t")
......@@ -87,7 +87,7 @@ class RoutersploitCompleterTest(unittest.TestCase):
self.set_module()
self.rsf.send("\t\t")
self.assertPrompt(
'back check debug exit run set show \r\n',
'back check exit run set show \r\n',
self.module_prompt('FTP Bruteforce')
)
......
from __future__ import print_function
import unittest
import os
import inspect
try:
import unittest.mock as mock
......@@ -9,11 +8,7 @@ except ImportError:
import mock
from routersploit.interpreter import RoutersploitInterpreter
from routersploit.exploits import Exploit
class TestExploit(Exploit):
pass
from routersploit.exceptions import RoutersploitException
class RoutersploitInterpreterTest(unittest.TestCase):
......@@ -197,14 +192,14 @@ class RoutersploitInterpreterTest(unittest.TestCase):
def test_suggested_commands_with_loaded_module(self):
self.assertEqual(
self.interpreter.suggested_commands(),
['run', 'back', 'set ', 'show ', 'check', 'debug', 'exit'] # Extra space at the end because of following param
['run', 'back', 'set ', 'show ', 'check', 'exit'] # Extra space at the end because of following param
)
def test_suggested_commands_without_loaded_module(self):
self.interpreter.current_module = None
self.assertEqual(
self.interpreter.suggested_commands(), # Extra space at the end because of following param
['use ', 'debug', 'exit']
['use ', 'exit']
)
@mock.patch('importlib.import_module')
......@@ -255,14 +250,17 @@ class RoutersploitInterpreterTest(unittest.TestCase):
self.interpreter.current_module = None
self.interpreter.modules = ['doo/pa/foo/bar']
module_path = "creds/foo/bar/baz"
mocked_import_module.side_effect = ImportError
mocked_import_module.side_effect = ImportError("Not working")
self.interpreter.command_use(module_path)
mocked_import_module.assert_called_once_with('routersploit.modules.creds.foo.bar.baz')
mocked_print_error.assert_called_once_with("Error during loading 'routersploit/modules/creds/foo/bar/baz' "
"module. It should be valid path to the module. "
"Use <tab> key multiple times for completion.")
mocked_print_error.assert_called_once_with(
"Error during loading 'routersploit/modules/creds/foo/bar/baz'\n\n"
"Error: Not working\n\n"
"It should be valid path to the module. Use <tab> key multiple times for completion."
)
self.assertEqual(self.interpreter.current_module, None)
@mock.patch('importlib.import_module')
......@@ -282,9 +280,12 @@ class RoutersploitInterpreterTest(unittest.TestCase):
self.interpreter.command_use(module_path)
mocked_import_module.assert_called_once_with('routersploit.modules.exploits.foo.bar')
mocked_print_error.assert_called_once_with("Error during loading 'routersploit/modules/exploits/foo/bar' "
"module. It should be valid path to the module. "
"Use <tab> key multiple times for completion.")
mocked_print_error.assert_called_once_with(
"Error during loading 'routersploit/modules/exploits/foo/bar'\n\n"
"Error: Exploit\n\n"
"It should be valid path to the module. Use <tab> key multiple times for completion."
)
self.assertEqual(self.interpreter.current_module, None)
@mock.patch('__builtin__.print')
......@@ -433,140 +434,6 @@ class RoutersploitInterpreterTest(unittest.TestCase):
"module_required"
)
@mock.patch('os.walk')
@mock.patch('importlib.import_module')
@mock.patch('inspect.getmembers')
def test_load_modules(self, mock_getmembers, mock_import_module, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc']),
)
mock_import_module.side_effect = [1, 2, 3, 4, 5]
mock_getmembers.side_effect = [
[],
[],
[("FTPBruteforce", TestExploit), ('SomeClass', mock.MagicMock), ('Exploit123', TestExploit)],
[],
[("Exploit", TestExploit), ('SomeClass', mock.MagicMock)]
]
self.interpreter.load_modules()
mock_walk.assert_called_once_with(self.interpreter.modules_directory)
self.assertEqual(
mock_import_module.mock_calls,
[
mock.call('routersploit.modules.__init__'),
mock.call('routersploit.modules.creds.__init__'),
mock.call('routersploit.modules.creds.ftp_bruteforce'),
mock.call('routersploit.modules.exploits.asmax.__init__'),
mock.call('routersploit.modules.exploits.asmax.asmax_exploit')
]
)
self.assertEqual(
mock_getmembers.mock_calls,
[
mock.call(1, inspect.isclass),
mock.call(2, inspect.isclass),
mock.call(3, inspect.isclass),
mock.call(4, inspect.isclass),
mock.call(5, inspect.isclass),
]
)
self.assertEqual(
self.interpreter.modules,
[
'creds.ftp_bruteforce',
'exploits.asmax.asmax_exploit'
]
)
@mock.patch('os.walk')
@mock.patch('importlib.import_module')
@mock.patch('inspect.getmembers')
def test_load_modules_import_error(self, mock_getmembers, mock_import_module, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc', 'asmax_multi.py', 'asmax_multi.pyc']),
)
import_error = ImportError("No module doopaa")
mock_import_module.side_effect = [1, 2, import_error, 4, 5, import_error]
mock_getmembers.side_effect = [
[],
[],
[],
[("Exploit", TestExploit), ('SomeClass', mock.MagicMock)]
]
self.interpreter.load_modules()
mock_walk.assert_called_once_with(self.interpreter.modules_directory)
self.assertEqual(
mock_import_module.mock_calls,
[
mock.call('routersploit.modules.__init__'),
mock.call('routersploit.modules.creds.__init__'),
mock.call('routersploit.modules.creds.ftp_bruteforce'),
mock.call('routersploit.modules.exploits.asmax.__init__'),
mock.call('routersploit.modules.exploits.asmax.asmax_exploit'),
mock.call('routersploit.modules.exploits.asmax.asmax_multi')
]
)
self.assertEqual(
mock_getmembers.mock_calls,
[
mock.call(1, inspect.isclass),
mock.call(2, inspect.isclass),
mock.call(4, inspect.isclass),
mock.call(5, inspect.isclass),
]
)
self.assertEqual(
self.interpreter.modules,
[
'exploits.asmax.asmax_exploit'
]
)
self.assertEqual(
self.interpreter.modules_with_errors,
{
"routersploit.modules.creds.ftp_bruteforce": import_error,
'routersploit.modules.exploits.asmax.asmax_multi': import_error,
}
)
@mock.patch('routersploit.utils.print_info')
@mock.patch('routersploit.utils.print_error')
def test_command_debug(self, mocked_print_error, mocked_print_info, ):
self.interpreter.modules_with_errors = {
"foo.bar.exploit": "foo foo error",
"foo.baz.exploit": "foo baz error",
"doo.paa.exploit": "doo paa error",
}
self.interpreter.command_debug()
self.assertItemsEqual(
mocked_print_info.mock_calls,
[
mock.call("foo.baz.exploit"),
mock.call("foo.bar.exploit"),
mock.call("doo.paa.exploit"),
]
)
self.assertItemsEqual(
mocked_print_error.mock_calls,
[
mock.call("doo paa error", '\n'),
mock.call("foo foo error", '\n'),
mock.call("foo baz error", '\n'),
]
)
def test_command_exit(self):
with self.assertRaises(KeyboardInterrupt):
self.interpreter.command_exit()
......
import unittest
from routersploit.utils import iter_modules
class ModuleTest(unittest.TestCase):
"""A test case that every module must pass.
Attributes:
module (Exploit): The exploit instance of the module being tested.
metadata (Dict): The info associated with the module.
"""
def __init__(self, methodName='runTest', module=None):
super(ModuleTest, self).__init__(methodName)
self.module = module
def __str__(self):
return " ".join([super(ModuleTest, self).__str__(), self.module.__module__])
@property
def module_metadata(self):
return getattr(self.module, "_{}__info__".format(self.module.__name__))
def test_required_metadata(self):
required_metadata = (
"name",
"description",
"devices",
"authors",
"references"
)
self.assertItemsEqual(required_metadata, self.module_metadata.keys())
def load_tests(loader, tests, pattern):
""" Map every module to a test case, and group them into a suite. """
suite = unittest.TestSuite()
test_names = loader.getTestCaseNames(ModuleTest)
for module in iter_modules():
suite.addTests([ModuleTest(name, module) for name in test_names])
return suite
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
from __future__ import print_function
from __future__ import absolute_import
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from routersploit.utils import index_modules
class UtilsTest(unittest.TestCase):
@mock.patch('os.walk')
def test_load_modules_01(self, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc']),
)
path = 'path/to/module'
modules = index_modules(path)
mock_walk.assert_called_once_with(path)
self.assertEqual(
modules,
[
'creds.ftp_bruteforce',
'exploits.asmax.asmax_exploit'
]
)
@mock.patch('os.walk')
def test_load_modules_import_error_02(self, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc', 'asmax_multi.py', 'asmax_multi.pyc']),
)
path = 'path/to/module'
modules = index_modules(path)
mock_walk.assert_called_once_with(path)
self.assertEqual(
modules,
[
'creds.ftp_bruteforce',
'exploits.asmax.asmax_exploit',
'exploits.asmax.asmax_multi',
]
)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
from __future__ import print_function
from __future__ import absolute_import
import threading
from functools import wraps
from distutils.util import strtobool
import os
import sys
import random
import string
import socket
from functools import wraps
from distutils.util import strtobool
import importlib
import requests
from .exceptions import RoutersploitException
from . import modules as rsf_modules
MODULES_DIR = rsf_modules.__path__[0]
print_lock = threading.Lock()
......@@ -20,6 +28,49 @@ colors = {
}
def index_modules(modules_directory=MODULES_DIR):
""" Return list of all exploits modules """
modules = []
for root, dirs, files in os.walk(modules_directory):
_, package, root = root.rpartition('routersploit/modules/'.replace('/', os.sep))
root = root.replace(os.sep, '.')
files = filter(lambda x: not x.startswith("__") and x.endswith('.py'), files)
modules.extend(map(lambda x: '.'.join((root, os.path.splitext(x)[0])), files))
return modules
def import_exploit(path):
""" Import exploit module
:param path: absolute path to exploit e.g. routersploit.modules.exploits.asus.pass_bypass
:return: exploit module or error
"""
try:
module = importlib.import_module(path)
return getattr(module, 'Exploit')
except (ImportError, AttributeError, KeyError) as err:
raise RoutersploitException(
"Error during loading '{}'\n\n"
"Error: {}\n\n"
"It should be valid path to the module. "
"Use <tab> key multiple times for completion.".format(humanize_path(path), err)
)
def iter_modules(modules_directory=MODULES_DIR):
""" Iterate over valid modules """
modules = index_modules(modules_directory)
modules = map(lambda x: "".join(['routersploit.modules.', x]), modules)
for path in modules:
try:
yield import_exploit(path)
except RoutersploitException:
pass
def pythonize_path(path):
""" Replace argument to valid python dotted notation.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment