Commit d933773d by fwkz

New module loading mechanism.

parent 2d49f6f4
......@@ -11,6 +11,7 @@ from routersploit.utils import (
boolify,
mute,
multi,
index_modules,
)
from routersploit import exploits
......
......@@ -4,10 +4,8 @@ import sys
import traceback
import atexit
import importlib
import inspect
from routersploit.exceptions import RoutersploitException
from routersploit.exploits import Exploit
from routersploit import utils
from routersploit import modules as rsf_modules
......@@ -158,13 +156,12 @@ class RoutersploitInterpreter(BaseInterpreter):
self.raw_prompt_template = None
self.module_prompt_template = None
self.prompt_hostname = 'rsf'
self.modules_directory = rsf_modules.__path__[0]
self.modules = []
self.modules_with_errors = {}
self.main_modules_dirs = []
modules_directory = rsf_modules.__path__[0]
self.modules = utils.index_modules(modules_directory)
self.main_modules_dirs = [module for module in os.listdir(modules_directory) if not module.startswith("__")]
self.__parse_prompt()
self.load_modules()
self.banner = """ ______ _ _____ _ _ _
| ___ \ | | / ___| | | (_) |
......@@ -182,27 +179,32 @@ class RoutersploitInterpreter(BaseInterpreter):
Total module count: {modules_count}
""".format(modules_count=len(self.modules))
def load_modules(self):
self.main_modules_dirs = [module for module in os.listdir(self.modules_directory) if not module.startswith("__")]
self.modules = []
self.modules_with_errors = {}
for root, dirs, files in os.walk(self.modules_directory):
_, package, root = root.rpartition('routersploit')
root = "".join((package, root)).replace(os.sep, '.')
modules = map(lambda x: '.'.join((root, os.path.splitext(x)[0])), filter(lambda x: x.endswith('.py'), files))
for module_path in modules:
try:
module = importlib.import_module(module_path)
except ImportError as error:
self.modules_with_errors[module_path] = error
else:
klasses = inspect.getmembers(module, inspect.isclass)
exploits = filter(lambda x: issubclass(x[1], Exploit), klasses)
# exploits = map(lambda x: '.'.join([module_path.split('.', 2).pop(), x[0]]), exploits)
# self.modules.extend(exploits)
if exploits:
self.modules.append(module_path.split('.', 2).pop())
# def load_modules(self):
# self.main_modules_dirs = [module for module in os.listdir(self.modules_directory) if not module.startswith("__")]
# self.modules = []
# self.modules_with_errors = {}
#
# for root, dirs, files in os.walk(self.modules_directory):
# _, package, root = root.rpartition('routersploit/modules/'.replace('/', os.sep))
# root = root.replace(os.sep, '.')
# files = filter(lambda x: not x.startswith("__") and x.endswith('.py'), files)
# self.modules.extend(map(lambda x: '.'.join((root, os.path.splitext(x)[0])), files))
#
# exploits = map(lambda x: '.'.join([module_path.split('.', 2).pop(), x[0]]), exploits)
#
# for module_path in self.modules:
# print(module_path)
# try:
# module = importlib.import_module(module_path)
# except ImportError as error:
# self.modules_with_errors[module_path] = error
# else:
# klasses = inspect.getmembers(module, inspect.isclass)
# exploits = filter(lambda x: issubclass(x[1], Exploit), klasses)
# # exploits = map(lambda x: '.'.join([module_path.split('.', 2).pop(), x[0]]), exploits)
# # self.modules.extend(exploits)
# if exploits:
# self.modules.append(module_path.split('.', 2).pop())
def __parse_prompt(self):
raw_prompt_default_template = "\001\033[4m\002{host}\001\033[0m\002 > "
......@@ -259,9 +261,9 @@ class RoutersploitInterpreter(BaseInterpreter):
:return: list of most accurate command suggestions
"""
if self.current_module:
return ['run', 'back', 'set ', 'show ', 'check', 'debug', 'exit']
return ['run', 'back', 'set ', 'show ', 'check', 'exit']
else:
return ['use ', 'debug', 'exit']
return ['use ', 'exit']
def command_back(self, *args, **kwargs):
self.current_module = None
......@@ -271,12 +273,9 @@ class RoutersploitInterpreter(BaseInterpreter):
module_path = '.'.join(('routersploit', 'modules', module_path))
# module_path, _, exploit_name = module_path.rpartition('.')
try:
module = importlib.import_module(module_path)
self.current_module = getattr(module, 'Exploit')()
except (ImportError, AttributeError, KeyError):
utils.print_error("Error during loading '{}' module. "
"It should be valid path to the module. "
"Use <tab> key multiple times for completion.".format(utils.humanize_path(module_path)))
self.current_module = utils.import_exploit(module_path)()
except RoutersploitException as err:
utils.print_error(err.message)
@utils.stop_after(2)
def complete_use(self, text, *args, **kwargs):
......@@ -392,10 +391,5 @@ class RoutersploitInterpreter(BaseInterpreter):
else:
utils.print_status("Target could not be verified")
def command_debug(self, *args, **kwargs):
for key, value in self.modules_with_errors.iteritems():
utils.print_info(key)
utils.print_error(value, '\n')
def command_exit(self, *args, **kwargs):
raise KeyboardInterrupt
......@@ -30,7 +30,7 @@ class RoutersploitCompleterTest(unittest.TestCase):
def test_raw_commands_no_module(self):
self.rsf.send("\t\t")
self.assertPrompt('debug exit use \r\n', self.raw_prompt)
self.assertPrompt('exit use \r\n', self.raw_prompt)
def test_complete_use_raw(self):
self.rsf.send("u\t\t")
......@@ -87,7 +87,7 @@ class RoutersploitCompleterTest(unittest.TestCase):
self.set_module()
self.rsf.send("\t\t")
self.assertPrompt(
'back check debug exit run set show \r\n',
'back check exit run set show \r\n',
self.module_prompt('FTP Bruteforce')
)
......
from __future__ import print_function
import unittest
import os
import inspect
try:
import unittest.mock as mock
......@@ -9,11 +8,7 @@ except ImportError:
import mock
from routersploit.interpreter import RoutersploitInterpreter
from routersploit.exploits import Exploit
class TestExploit(Exploit):
pass
from routersploit.exceptions import RoutersploitException
class RoutersploitInterpreterTest(unittest.TestCase):
......@@ -197,14 +192,14 @@ class RoutersploitInterpreterTest(unittest.TestCase):
def test_suggested_commands_with_loaded_module(self):
self.assertEqual(
self.interpreter.suggested_commands(),
['run', 'back', 'set ', 'show ', 'check', 'debug', 'exit'] # Extra space at the end because of following param
['run', 'back', 'set ', 'show ', 'check', 'exit'] # Extra space at the end because of following param
)
def test_suggested_commands_without_loaded_module(self):
self.interpreter.current_module = None
self.assertEqual(
self.interpreter.suggested_commands(), # Extra space at the end because of following param
['use ', 'debug', 'exit']
['use ', 'exit']
)
@mock.patch('importlib.import_module')
......@@ -255,14 +250,17 @@ class RoutersploitInterpreterTest(unittest.TestCase):
self.interpreter.current_module = None
self.interpreter.modules = ['doo/pa/foo/bar']
module_path = "creds/foo/bar/baz"
mocked_import_module.side_effect = ImportError
mocked_import_module.side_effect = ImportError("Not working")
self.interpreter.command_use(module_path)
mocked_import_module.assert_called_once_with('routersploit.modules.creds.foo.bar.baz')
mocked_print_error.assert_called_once_with("Error during loading 'routersploit/modules/creds/foo/bar/baz' "
"module. It should be valid path to the module. "
"Use <tab> key multiple times for completion.")
mocked_print_error.assert_called_once_with(
"Error during loading 'routersploit/modules/creds/foo/bar/baz'\n\n"
"Error: Not working\n\n"
"It should be valid path to the module. Use <tab> key multiple times for completion."
)
self.assertEqual(self.interpreter.current_module, None)
@mock.patch('importlib.import_module')
......@@ -282,9 +280,12 @@ class RoutersploitInterpreterTest(unittest.TestCase):
self.interpreter.command_use(module_path)
mocked_import_module.assert_called_once_with('routersploit.modules.exploits.foo.bar')
mocked_print_error.assert_called_once_with("Error during loading 'routersploit/modules/exploits/foo/bar' "
"module. It should be valid path to the module. "
"Use <tab> key multiple times for completion.")
mocked_print_error.assert_called_once_with(
"Error during loading 'routersploit/modules/exploits/foo/bar'\n\n"
"Error: Exploit\n\n"
"It should be valid path to the module. Use <tab> key multiple times for completion."
)
self.assertEqual(self.interpreter.current_module, None)
@mock.patch('__builtin__.print')
......@@ -433,140 +434,6 @@ class RoutersploitInterpreterTest(unittest.TestCase):
"module_required"
)
@mock.patch('os.walk')
@mock.patch('importlib.import_module')
@mock.patch('inspect.getmembers')
def test_load_modules(self, mock_getmembers, mock_import_module, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc']),
)
mock_import_module.side_effect = [1, 2, 3, 4, 5]
mock_getmembers.side_effect = [
[],
[],
[("FTPBruteforce", TestExploit), ('SomeClass', mock.MagicMock), ('Exploit123', TestExploit)],
[],
[("Exploit", TestExploit), ('SomeClass', mock.MagicMock)]
]
self.interpreter.load_modules()
mock_walk.assert_called_once_with(self.interpreter.modules_directory)
self.assertEqual(
mock_import_module.mock_calls,
[
mock.call('routersploit.modules.__init__'),
mock.call('routersploit.modules.creds.__init__'),
mock.call('routersploit.modules.creds.ftp_bruteforce'),
mock.call('routersploit.modules.exploits.asmax.__init__'),
mock.call('routersploit.modules.exploits.asmax.asmax_exploit')
]
)
self.assertEqual(
mock_getmembers.mock_calls,
[
mock.call(1, inspect.isclass),
mock.call(2, inspect.isclass),
mock.call(3, inspect.isclass),
mock.call(4, inspect.isclass),
mock.call(5, inspect.isclass),
]
)
self.assertEqual(
self.interpreter.modules,
[
'creds.ftp_bruteforce',
'exploits.asmax.asmax_exploit'
]
)
@mock.patch('os.walk')
@mock.patch('importlib.import_module')
@mock.patch('inspect.getmembers')
def test_load_modules_import_error(self, mock_getmembers, mock_import_module, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc', 'asmax_multi.py', 'asmax_multi.pyc']),
)
import_error = ImportError("No module doopaa")
mock_import_module.side_effect = [1, 2, import_error, 4, 5, import_error]
mock_getmembers.side_effect = [
[],
[],
[],
[("Exploit", TestExploit), ('SomeClass', mock.MagicMock)]
]
self.interpreter.load_modules()
mock_walk.assert_called_once_with(self.interpreter.modules_directory)
self.assertEqual(
mock_import_module.mock_calls,
[
mock.call('routersploit.modules.__init__'),
mock.call('routersploit.modules.creds.__init__'),
mock.call('routersploit.modules.creds.ftp_bruteforce'),
mock.call('routersploit.modules.exploits.asmax.__init__'),
mock.call('routersploit.modules.exploits.asmax.asmax_exploit'),
mock.call('routersploit.modules.exploits.asmax.asmax_multi')
]
)
self.assertEqual(
mock_getmembers.mock_calls,
[
mock.call(1, inspect.isclass),
mock.call(2, inspect.isclass),
mock.call(4, inspect.isclass),
mock.call(5, inspect.isclass),
]
)
self.assertEqual(
self.interpreter.modules,
[
'exploits.asmax.asmax_exploit'
]
)
self.assertEqual(
self.interpreter.modules_with_errors,
{
"routersploit.modules.creds.ftp_bruteforce": import_error,
'routersploit.modules.exploits.asmax.asmax_multi': import_error,
}
)
@mock.patch('routersploit.utils.print_info')
@mock.patch('routersploit.utils.print_error')
def test_command_debug(self, mocked_print_error, mocked_print_info, ):
self.interpreter.modules_with_errors = {
"foo.bar.exploit": "foo foo error",
"foo.baz.exploit": "foo baz error",
"doo.paa.exploit": "doo paa error",
}
self.interpreter.command_debug()
self.assertItemsEqual(
mocked_print_info.mock_calls,
[
mock.call("foo.baz.exploit"),
mock.call("foo.bar.exploit"),
mock.call("doo.paa.exploit"),
]
)
self.assertItemsEqual(
mocked_print_error.mock_calls,
[
mock.call("doo paa error", '\n'),
mock.call("foo foo error", '\n'),
mock.call("foo baz error", '\n'),
]
)
def test_command_exit(self):
with self.assertRaises(KeyboardInterrupt):
self.interpreter.command_exit()
......
from __future__ import print_function
from __future__ import absolute_import
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from routersploit.utils import index_modules
class UtilsTest(unittest.TestCase):
@mock.patch('os.walk')
def test_load_modules_01(self, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc']),
)
path = 'path/to/module'
modules = index_modules(path)
mock_walk.assert_called_once_with(path)
self.assertEqual(
modules,
[
'creds.ftp_bruteforce',
'exploits.asmax.asmax_exploit'
]
)
@mock.patch('os.walk')
def test_load_modules_import_error_02(self, mock_walk):
mock_walk.return_value = (
('/Abs/Path/routersploit/routersploit/modules', ['asmax', 'creds'], ['__init__.py', '__init__.pyc']),
('/Abs/Path/routersploit/routersploit/modules/creds', [], ['__init__.py', '__init__.pyc', 'ftp_bruteforce.py', 'ftp_bruteforce.pyc']),
('/Abs/Path/routersploit/routersploit/modules/exploits/asmax', [], ['__init__.py', '__init__.pyc', 'asmax_exploit.py', 'asmax_exploit.pyc', 'asmax_multi.py', 'asmax_multi.pyc']),
)
path = 'path/to/module'
modules = index_modules(path)
mock_walk.assert_called_once_with(path)
self.assertEqual(
modules,
[
'creds.ftp_bruteforce',
'exploits.asmax.asmax_exploit',
'exploits.asmax.asmax_multi',
]
)
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
from __future__ import print_function
from __future__ import absolute_import
import threading
from functools import wraps
from distutils.util import strtobool
import os
import sys
import random
import string
import socket
from functools import wraps
from distutils.util import strtobool
import importlib
import requests
from .exceptions import RoutersploitException
print_lock = threading.Lock()
......@@ -20,6 +25,36 @@ colors = {
}
def index_modules(modules_directory):
""" Return list of all exploits modules """
modules = []
for root, dirs, files in os.walk(modules_directory):
_, package, root = root.rpartition('routersploit/modules/'.replace('/', os.sep))
root = root.replace(os.sep, '.')
files = filter(lambda x: not x.startswith("__") and x.endswith('.py'), files)
modules.extend(map(lambda x: '.'.join((root, os.path.splitext(x)[0])), files))
return modules
def import_exploit(path):
""" Import exploit module
:param path: absolute path to exploit e.g. routersploit.modules.exploits.asus.pass_bypass
:return: exploit module or error
"""
try:
module = importlib.import_module(path)
return getattr(module, 'Exploit')
except (ImportError, AttributeError, KeyError) as err:
raise RoutersploitException(
"Error during loading '{}'\n\n"
"Error: {}\n\n"
"It should be valid path to the module. "
"Use <tab> key multiple times for completion.".format(humanize_path(path), err)
)
def pythonize_path(path):
""" Replace argument to valid python dotted notation.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment