Unverified Commit 6af115ef by Marcin Bury Committed by GitHub

Fixing methods, adding type annotaions and doc strings (#534)

parent 581acc64
class RoutersploitException(Exception):
def __init__(self, msg=""):
def __init__(self, msg: str=""):
super(RoutersploitException, self).__init__(msg)
......
......@@ -14,6 +14,19 @@ from routersploit.core.exploit.option import Option
GLOBAL_OPTS = {}
class Protocol:
CUSTOM = "custom"
TCP = "custom/tcp"
UDP = "custom/udp"
FTP = "ftp"
FTPS = "ftps"
SSH = "ssh"
TELNET = "telnet"
HTTP = "http"
HTTPS = "https"
SNMP = "snmp"
class ExploitOptionsAggregator(type):
""" Metaclass for exploit base class.
......@@ -62,7 +75,7 @@ class BaseExploit(with_metaclass(ExploitOptionsAggregator, object)):
class Exploit(BaseExploit):
""" Base class for exploits """
target_protocol = "custom"
target_protocol = Protocol.CUSTOM
def run(self):
raise NotImplementedError("You have to define your own 'run' method.")
......@@ -70,7 +83,16 @@ class Exploit(BaseExploit):
def check(self):
raise NotImplementedError("You have to define your own 'check' method.")
def run_threads(self, threads_number, target_function, *args, **kwargs):
def run_threads(self, threads_number: int, target_function: any, *args, **kwargs) -> None:
""" Run function across specified number of threads
:param int thread_number: number of threads that should be executed
:param func target_function: function that should be executed accross specified number of threads
:param any args: args passed to target_function
:param any kwargs: kwargs passed to target function
:return None
"""
threads = []
threads_running = threading.Event()
threads_running.set()
......@@ -189,15 +211,3 @@ class LockedIterator(object):
return item
finally:
self.lock.release()
class Protocol:
TCP = "custom/tcp"
UDP = "custom/udp"
FTP = "ftp"
FTPS = "ftps"
SSH = "ssh"
TELNET = "telnet"
HTTP = "http"
HTTPS = "https"
SNMP = "snmp"
......@@ -27,8 +27,6 @@ from routersploit.core.exploit.utils import (
architectures = namedtuple("ArchitectureType", ["ARMLE", "MIPSBE", "MIPSLE", "X86", "X64", "PERL", "PHP", "PYTHON"])
payload_handlers = namedtuple("PayloadHandlers", ["BIND_TCP", "REVERSE_TCP"])
Architectures = architectures(
ARMLE="armle",
MIPSBE="mipsbe",
......@@ -40,6 +38,7 @@ Architectures = architectures(
PYTHON="python",
)
payload_handlers = namedtuple("PayloadHandlers", ["BIND_TCP", "REVERSE_TCP"])
PayloadHandlers = payload_handlers(
BIND_TCP="bind_tcp",
REVERSE_TCP="reverse_tcp",
......
......@@ -50,23 +50,39 @@ def __cprint(*args, **kwargs):
printer_queue.put(PrintResource(content=args, sep=sep, end=end, file=file_, thread=thread))
def print_error(*args, **kwargs):
def print_error(*args, **kwargs) -> None:
""" Print error message prefixing it with [-]
"""
__cprint("\033[91m[-]\033[0m", *args, **kwargs)
def print_status(*args, **kwargs):
def print_status(*args, **kwargs) -> None:
""" Print status message prefixing it with [-]
"""
__cprint("\033[94m[*]\033[0m", *args, **kwargs)
def print_success(*args, **kwargs):
def print_success(*args, **kwargs) -> None:
""" Print success message prefixing it with [-]
"""
__cprint("\033[92m[+]\033[0m", *args, **kwargs)
def print_info(*args, **kwargs):
def print_info(*args, **kwargs) -> None:
""" Print info message prefixing it with [-]
"""
__cprint(*args, **kwargs)
def print_table(headers, *args, **kwargs):
def print_table(headers, *args, **kwargs) -> None:
""" Print table.
example:
......@@ -125,7 +141,7 @@ def print_table(headers, *args, **kwargs):
print_info()
def pprint_dict_in_order(dictionary, order=None):
def pprint_dict_in_order(dictionary, order=None) -> None:
""" Pretty dict print.
Pretty printing dictionary in specific order. (as in 'show info' command)
......@@ -168,16 +184,31 @@ def pprint_dict_in_order(dictionary, order=None):
prettyprint(rest_keys, dictionary[rest_keys])
def color_blue(string):
""" Returns string colored with blue """
def color_blue(string: str) -> str:
""" Returns string colored with blue
:param str string:
:return str:
"""
return "\033[94m{}\033[0m".format(string)
def color_green(string):
""" Returns string colored with green """
def color_green(string: str) -> str:
""" Returns string colored with green
:param str string:
:return str:
"""
return "\033[92m{}\033[0m".format(string)
def color_red(string):
""" Returns string colored with red """
def color_red(string: str) -> str:
""" Returns string colored with red
:param str string:
:return str:
"""
return "\033[91m{}\033[0m".format(string)
......@@ -15,12 +15,23 @@ MODULES_DIR = rsf_modules.__path__[0]
WORDLISTS_DIR = wordlists.__path__[0]
def random_text(length, alph=string.ascii_letters + string.digits):
return ''.join(random.choice(alph) for _ in range(length))
def random_text(length: int, alph: str=string.ascii_letters + string.digits) -> str:
""" Generates random string text
:param int length: length of text to generate
:param str alph: string of all possible characters to choose from
:return str: generated random string of specified size
"""
return "".join(random.choice(alph) for _ in range(length))
def is_ipv4(address):
""" Checks if given address is valid IPv4 address """
def is_ipv4(address: str) -> bool:
""" Checks if given address is valid IPv4 address
:param str address: IP address to check
:return bool: True if address is valid IPv4 address, False otherwise
"""
regexp = "^(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$"
if re.match(regexp, address):
......@@ -29,8 +40,12 @@ def is_ipv4(address):
return False
def is_ipv6(address):
""" Checks if given address is valid IPv6 address """
def is_ipv6(address: str) -> bool:
""" Checks if given address is valid IPv6 address
:param str address: IP address to check
:return bool: True if address is valid IPv6 address, False otherwise
"""
regexp = "^(?:(?:[0-9A-Fa-f]{1,4}:){6}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|::(?:[0-9A-Fa-f]{1,4}:){5}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){4}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){3}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,2}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:){2}(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,3}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}:(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,4}[0-9A-Fa-f]{1,4})?::(?:[0-9A-Fa-f]{1,4}:[0-9A-Fa-f]{1,4}|(?:(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}(?:[0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5]))|(?:(?:[0-9A-Fa-f]{1,4}:){,5}[0-9A-Fa-f]{1,4})?::[0-9A-Fa-f]{1,4}|(?:(?:[0-9A-Fa-f]{1,4}:){,6}[0-9A-Fa-f]{1,4})?::)%.*$"
......@@ -40,8 +55,12 @@ def is_ipv6(address):
return False
def convert_ip(address):
""" Converts IP to bytes """
def convert_ip(address: str) -> bytes:
""" Converts IP to bytes
:param str address: IP address that should be converted to bytes
:return bytes: IP converted to bytes format
"""
res = b""
for i in address.split("."):
......@@ -49,15 +68,23 @@ def convert_ip(address):
return res
def convert_port(port):
""" Converts Port to bytes """
def convert_port(port: int) -> bytes:
""" Converts Port to bytes
:param int port: port that should be conveted to bytes
:return bytes: port converted to bytes format
"""
res = "%.4x" % int(port)
return bytes.fromhex(res)
def index_modules(modules_directory=MODULES_DIR):
""" Returns list of all exploits modules """
def index_modules(modules_directory: str=MODULES_DIR) -> list:
""" Returns list of all exploits modules
:param str modules_directory: path to modules directory
:return list: list of found modules
"""
modules = []
for root, dirs, files in os.walk(modules_directory):
......@@ -69,10 +96,10 @@ def index_modules(modules_directory=MODULES_DIR):
return modules
def import_exploit(path):
def import_exploit(path: str):
""" Imports exploit module
:param path: absolute path to exploit e.g. routersploit.modules.exploits.asus_auth_bypass
:param str path: absolute path to exploit e.g. routersploit.modules.exploits.asus_auth_bypass
:return: exploit module or error
"""
......@@ -96,8 +123,12 @@ def import_exploit(path):
)
def iter_modules(modules_directory=MODULES_DIR):
""" Iterates over valid modules """
def iter_modules(modules_directory: str=MODULES_DIR) -> list:
""" Iterates over valid modules
:param str modules_directory: path to modules directory
:return list: list of found modules
"""
modules = index_modules(modules_directory)
modules = map(lambda x: "".join(["routersploit.modules.", x]), modules)
......@@ -105,22 +136,25 @@ def iter_modules(modules_directory=MODULES_DIR):
yield import_exploit(path)
def pythonize_path(path):
def pythonize_path(path: str) -> str:
""" Replaces argument to valid python dotted notation.
ex. foo/bar/baz -> foo.bar.baz
:param str path: path to pythonize
:return str: pythonized path
"""
return path.replace("/", ".")
def humanize_path(path):
def humanize_path(path: str) -> str:
""" Replace python dotted path to directory-like one.
ex. foo.bar.baz -> foo/bar/baz
:param path: path to humanize
:return: humanized path
:param str path: path to humanize
:return str: humanized path
"""
return path.replace(".", "/")
......@@ -132,6 +166,7 @@ def module_required(fn):
Decorator that checks if any module is activated
before executing command specific to modules (ex. 'run').
"""
@wraps(fn)
def wrapper(self, *args, **kwargs):
if not self.current_module:
......@@ -176,8 +211,12 @@ def stop_after(space_number):
return _outer_wrapper
def lookup_vendor(addr):
""" Lookups vendor (manufacturer) based on MAC address """
def lookup_vendor(addr: str) -> str:
""" Lookups vendor (manufacturer) based on MAC address
:param str addr: MAC address to lookup
:return str: vendor name from oui.dat database
"""
addr = addr.upper().replace(":", "")
......@@ -203,7 +242,7 @@ class Version(object):
self.value = value
def __lt__(self, other):
"""Override the default x<y"""
""" Override the default x<y """
if self._compare_versions(self.value, other.value) < 0:
return True
return False
......@@ -236,11 +275,14 @@ class Version(object):
@staticmethod
def _compare_versions(version1, version2):
"""
Simple and dirty implementation
if version1 < version2 then -1
if version1 == version2 then 0
if version1 > version2 then 1
""" Version comparision
:param Version version1:
:param Version version2:
:return int:
if version1 < version2 then -1
if version1 == version2 then 0
if version1 > version2 then 1
"""
arr1 = re.sub("\D", ".", str(version1)).split(".")
......@@ -260,7 +302,14 @@ class Version(object):
return 0
def detect_file_content(content, f="/etc/passwd"):
def detect_file_content(content: str, f: str="/etc/passwd") -> bool:
""" Detect specific file content in content
:param str content: file content that should be analyzed
:param str f: file that the content should be compared with
:return bool: True if the content was recognized, False otherwise
"""
if f in ["/etc/passwd", "/etc/shadow"]:
if re.findall(r"(root|[aA]dmin):.*?:.*?:.*?:.*?:.*?:", content):
return True
......
......@@ -12,9 +12,18 @@ FTP_TIMEOUT = 8.0
class FTPCli(object):
""" FTP Client """
""" FTP Client provides methods to handle communication with FTP server """
def __init__(self, ftp_target: str, ftp_port: int, ssl: bool=False, verbosity: bool=False) -> None:
""" FTP client constructor
:param str ftp_target: target FTP server ip address
:param int ftp_port: target FTP server port
:param bool ssl: target FTP ssl enabled
:param bool verbosity: display verbose output
:return None:
"""
def __init__(self, ftp_target, ftp_port, ssl=False, verbosity=False):
self.ftp_target = ftp_target
self.ftp_port = ftp_port
self.verbosity = verbosity
......@@ -26,37 +35,61 @@ class FTPCli(object):
else:
self.ftp_client = ftplib.FTP()
def connect(self, retries=1):
def connect(self, retries: int=1) -> bool:
""" Connect to FTP server
:param int retries: number of retry attempts
:return bool: True if connection was successful, False otherwise
"""
for _ in range(retries):
try:
self.ftp_client.connect(self.ftp_target, self.ftp_port, timeout=FTP_TIMEOUT)
return self.ftp_client
return True
except Exception as err:
print_error(self.peer, "FTP Error while connecting to the server", err, verbose=self.verbosity)
self.ftp_client.close()
return None
return False
def login(self, username: str, password: str) -> bool:
""" Login to FTP server
:param str username: FTP account username
:param str password: FTP account password
:return bool: True if login was successful, False otherwise
"""
def login(self, username, password):
try:
self.ftp_client.login(username, password)
print_success(self.peer, "FTP Authentication Successful - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
return self.ftp_client
return True
except Exception as err:
print_error(self.peer, "FTP Authentication Failed - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
self.ftp_client.close()
return None
return False
def test_connect(self) -> bool:
""" Test connection to FTP server
:return bool: True if connection was successful, False otherwise
"""
def test_connect(self):
if self.connect():
self.ftp_client.close()
return True
return False
def get_content(self, remote_file):
def get_content(self, remote_file: str) -> str:
""" Get remote file from FTP server
:param str remote_file: remote file name
:return str: remote file content
"""
try:
fp_content = io.BytesIO()
self.ftp_client.retrbinary("RETR {}".format(remote_file), fp_content.write)
......@@ -66,24 +99,37 @@ class FTPCli(object):
return None
def close(self):
def close(self) -> bool:
""" Close FTP connection
:return bool: True if closing connection was successful, False otherwise
"""
try:
self.ftp_client.close()
return True
except Exception as err:
print_error(self.peer, "FTP Error while closing connection", err, verbose=self.verbosity)
return None
return False
class FTPClient(Exploit):
""" FTP Client exploit option and api """
""" FTP Client exploit """
target_protocol = Protocol.FTP
ssl = OptBool(False, "SSL enabled: true/false")
verbosity = OptBool(True, "Enable verbose output: true/false")
def ftp_create(self, target=None, port=None):
def ftp_create(self, target: str=None, port: int=None) -> FTPCli:
""" Create FTP client
:param str target: target FTP server ip address
:param int port: target FTP server port
:return FTPCli: FTP client object
"""
ftp_target = target if target else self.target
ftp_port = port if port else self.port
......
......@@ -14,14 +14,23 @@ HTTP_TIMEOUT = 30.0
class HTTPClient(Exploit):
""" HTTP Client exploit """
""" HTTP Client provides methods to handle communication with HTTP server """
target_protocol = Protocol.HTTP
verbosity = OptBool(True, "Verbosity enabled: true/false")
ssl = OptBool(False, "SSL enabled: true/false")
def http_request(self, method, path, session=requests, **kwargs):
def http_request(self, method: str, path: str, session: requests=requests, **kwargs) -> requests.Response:
""" Requests HTTP resource
:param str method: method that should be issued e.g. GET, POST
:param str path: path to the resource that should be requested
:param requests session: session manager that should be used
:param kwargs: kwargs passed to request method
:return Response: Response object
"""
if self.ssl:
url = "https://"
else:
......@@ -48,7 +57,13 @@ class HTTPClient(Exploit):
return None
def get_target_url(self, path=""):
def get_target_url(self, path: str="") -> str:
""" Get target URL
:param str path: path to http server resource
:return str: full target url with correct schema
"""
if self.ssl:
url = "https://"
else:
......@@ -58,7 +73,12 @@ class HTTPClient(Exploit):
return url
def http_test_connect(self):
def http_test_connect(self) -> bool:
""" Test connection to HTTP server
:return bool: True if test connection was successful, False otherwise
"""
response = self.http_request(
method="GET",
path="/"
......
......@@ -11,16 +11,33 @@ SNMP_TIMEOUT = 15.0
class SNMPCli(object):
""" SNMP Client """
""" SNMP Client provides methods to handle communication with SNMP server """
def __init__(self, snmp_target: str, snmp_port: int, verbosity: bool=False) -> None:
""" SNMP client constructor
:param str snmp_target: target SNMP server ip address
:param port snmp_port: target SNMP server port
:param bool verbosity: display verbose output
:return None:
"""
def __init__(self, snmp_target, snmp_port, verbosity=False):
self.snmp_target = snmp_target
self.snmp_port = snmp_port
self.verbosity = verbosity
self.peer = "{}:{}".format(self.snmp_target, snmp_port)
def get(self, community_string, oid, version=1, retries=0):
def get(self, community_string: str, oid: str, version: int=1, retries: int=0) -> bytes:
""" Get OID from SNMP server
:param str community_string: SNMP server communit string
:param str oid: SNMP server oid
:param int version: SNMP protocol version
:param int retries: number of retries
:return bytes: SNMP server response
"""
cmdGen = cmdgen.CommandGenerator()
try:
......@@ -49,7 +66,14 @@ class SNMPClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def snmp_create(self, target=None, port=None):
def snmp_create(self, target: str=None, port: int=None) -> SNMPCli:
""" Create SNMP client
:param str target: target SNMP server ip address
:param int port: target SNMP server port
:return SNMPCli: SNMP client object
"""
snmp_target = target if target else self.target
snmp_port = port if port else self.port
......
......@@ -18,7 +18,17 @@ SSH_TIMEOUT = 8.0
class SSHCli(object):
def __init__(self, ssh_target, ssh_port, verbosity):
""" SSH Client provides methods to handle communication with SSH server """
def __init__(self, ssh_target: str, ssh_port: int, verbosity=False) -> None:
""" SSH client constructor
:param str ssh_target: SSH target ip address
:param int ssh_port: SSH port number
:param bool verbosity: display verbose output
:return None:
"""
self.ssh_target = ssh_target
self.ssh_port = ssh_port
self.verbosity = verbosity
......@@ -28,7 +38,15 @@ class SSHCli(object):
self.ssh_client = paramiko.SSHClient()
self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def login(self, username, password, retries=1):
def login(self, username: str, password: str, retries: int=1) -> bool:
""" Login to SSH server
:param str username: SSH account username
:param str password: SSH account password
:param int retries: number of login retries
:return bool: True if login was successful, False otherwise
"""
for _ in range(retries):
try:
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, banner_timeout=SSH_TIMEOUT, username=username, password=password, look_for_keys=False)
......@@ -40,19 +58,27 @@ class SSHCli(object):
print_error(self.peer, "SSH Error while authenticating", err, verbose=self.verbosity)
else:
print_success(self.peer, "SSH Authentication Successful - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
return self.ssh_client
return True
self.ssh_client.close()
return None
return False
def login_pkey(self, username: str, priv_key: str, retries: int=1) -> bool:
""" Login to SSH server with private key
:param str username: SSH account username
:param str priv_key: SSH account private key
:param int retries: number of login retries
:return bool: True if login was successful, False otherwise
"""
def login_pkey(self, username, priv_key, retries=1):
if "DSA PRIVATE KEY" in priv_key:
priv_key = paramiko.DSSKey.from_private_key(io.StringIO(priv_key))
elif "RSA PRIVATE KEY" in priv_key:
priv_key = paramiko.RSAKey.from_private_key(io.StringIO(priv_key))
else:
return None
return False
for _ in range(retries):
try:
......@@ -63,13 +89,18 @@ class SSHCli(object):
print_error(self.peer, "SSH Error while authenticated by using private key", err, verbose=self.verbosity)
else:
print_success(self.peer, "SSH Authentication Successful - Username: '{}' with private key".format(username), verbose=self.verbosity)
return self.ssh_client
return True
self.ssh_client.close()
return None
return False
def test_connect(self) -> bool:
""" Test connection to SSH server
:return bool: True if test connection was successful, False otherwise
"""
def test_connect(self):
try:
self.ssh_client.connect(self.ssh_target, self.ssh_port, timeout=SSH_TIMEOUT, username="root", password=random_text(12), look_for_keys=False)
except paramiko.AuthenticationException:
......@@ -81,7 +112,13 @@ class SSHCli(object):
self.ssh_client.close()
return False
def execute(self, cmd):
def execute(self, cmd: str) -> str:
""" Execute command on SSH server
:param str cmd: command to execute on SSH server
:return str: command output
"""
try:
ssh_stdin, ssh_stdout, ssh_stderr = self.ssh_client.exec_command(cmd)
return ssh_stdout.read()
......@@ -90,16 +127,31 @@ class SSHCli(object):
return None
def get_file(self, remote_file, local_file):
def get_file(self, remote_file: str, local_file: str) -> bool:
""" Get file from SSH server
:param str remote_file: remote file on SSH server
:param str local_file: local file that it should be saved to
:return bool: True if getting file was successful, False otherwise
"""
try:
sftp = self.ssh_client.open_sftp()
sftp.get(remote_file, local_file)
return True
except Exception as err:
print_error(self.peer, "SSH Error while retrieving file from the server", err, verbose=self.verbosity)
return None
return False
def get_content(self, remote_file: str) -> str:
""" Get file content from SSH server
:param str remote_file: remote file on SSH server
:return str: file content from SSH server
"""
def get_content(self, remote_file):
try:
fp_content = io.BytesIO()
sftp = self.ssh_client.open_sftp()
......@@ -111,33 +163,60 @@ class SSHCli(object):
return None
def send_file(self, local_file, dest_file):
def send_file(self, local_file: str, dest_file: str) -> bool:
""" Send file to SSH server
:param str local_file: local file that should be send to SSH server
:param str dest_file: destination file that content should be saved to
:return bool: True if sending file was successful, False otherwise
"""
try:
sftp = self.ssh_client.open_sftp()
sftp.put(local_file, dest_file)
return True
except Exception as err:
print_error(self.peer, "SSH Error while sending file to the server", err, verbose=self.verbosity)
return None
return False
def send_content(self, content: str, dest_file: str) -> bool:
""" Send file content to SSH server
:param str content: data that should be sent to SSH file
:param str dst_file: destination file that data should be saved to
:return bool: True if sending file content was successful, False otherwise
"""
def send_content(self, content, dest_file):
try:
fp_content = io.BytesIO(content)
sftp = self.ssh_client.open_sftp()
sftp.putfo(fp_content, dest_file)
return True
except Exception as err:
print_error(self.peer, "SSH Error while sending content to the server", err, verbose=self.verbosity)
return None
return False
def interactive(self) -> None:
""" Start interactive mode with SSH server
:return None:
"""
def interactive(self):
chan = self.ssh_client.invoke_shell()
if os.name == "posix":
self._posix_shell(chan)
else:
self._windows_shell(chan)
def _posix_shell(self, chan):
def _posix_shell(self, chan: paramiko.channel.Channel) -> None:
""" Start posix shell with SSH server
:param paramiko.channel.Channel chan: channel for communicating with SSH server
:return None:
"""
import termios
import tty
......@@ -169,7 +248,13 @@ class SSHCli(object):
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
return
def _windows_shell(self, chan):
def _windows_shell(self, chan: paramiko.channel.Channel) -> None:
""" Start Windows shell with SSH server
:param paramiko.channel.Channel chan: channel for communicating with SSH server
:return None:
"""
def writeall(sock):
while True:
data = sock.recv(256)
......@@ -194,13 +279,19 @@ class SSHCli(object):
except Exception as err:
print_error("Error", err, verbose=self.verbosity)
def close(self):
def close(self) -> bool:
""" Close SSH connection
:return bool: True if closing connection was successful, False otherwise
"""
try:
self.ssh_client.close()
return True
except Exception as err:
print_error(self.peer, "SSH Error while closing connection", verbose=self.verbosity)
return None
return False
class SSHClient(Exploit):
......@@ -210,7 +301,14 @@ class SSHClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def ssh_create(self, target=None, port=None):
def ssh_create(self, target: str=None, port: int=None) -> SSHCli:
""" Create SSH client
:param str target: target SSH server ip address
:param int port: target SSH server port
:return SSHCli: SSH client object
"""
ssh_target = target if target else self.target
ssh_port = port if port else self.port
......
......@@ -13,7 +13,17 @@ TCP_SOCKET_TIMEOUT = 8.0
class TCPCli(object):
def __init__(self, tcp_target, tcp_port, verbosity=False):
""" TCP Client provides methods to handle communication with TCP server """
def __init__(self, tcp_target: str, tcp_port: int, verbosity: bool=False) -> None:
""" TCP client constructor
:param str tcp_target: target TCP server ip address
:param int tcp_port: target TCP server port
:param bool verbosity: display verbose output
:return None:
"""
self.tcp_target = tcp_target
self.tcp_port = tcp_port
self.verbosity = verbosity
......@@ -30,26 +40,42 @@ class TCPCli(object):
self.tcp_client.settimeout(TCP_SOCKET_TIMEOUT)
def connect(self):
def connect(self) -> bool:
""" Connect to TCP server
:return bool: True if connection was successful, False otherwise
"""
try:
self.tcp_client.connect((self.tcp_target, self.tcp_port))
print_status(self.peer, "TCP Connection established", verbose=self.verbosity)
return self.tcp_client
return True
except Exception as err:
print_error(self.peer, "TCP Error while connecting to the server", err, verbose=self.verbosity)
return None
return False
def send(self, data: bytes) -> bool:
""" Send data to TCP server
def send(self, data):
:param bytes data: data that should be sent to TCP server
:return bool: True if sending data was successful, False otherwise
"""
try:
return self.tcp_client.send(data)
self.tcp_client.send(data)
return True
except Exception as err:
print_error(self.peer, "TCP Error while sending data", err, verbose=self.verbosity)
return None
return False
def recv(self, num: int) -> bytes:
""" Receive data from TCP server
:param int num: number of bytes that should be received from the server
:return bytes: data that was received from the server
"""
def recv(self, num):
try:
response = self.tcp_client.recv(num)
return response
......@@ -58,7 +84,13 @@ class TCPCli(object):
return None
def recv_all(self, num):
def recv_all(self, num: int) -> bytes:
""" Receive all data sent by the server
:param int num: number of total bytes that should be received
:return bytes: data that was received from the server
"""
try:
response = b""
received = 0
......@@ -77,13 +109,19 @@ class TCPCli(object):
return None
def close(self):
def close(self) -> bool:
""" Close connection to TCP server
:return bool: True if closing connection was successful, False otherwise
"""
try:
self.tcp_client.close()
return True
except Exception as err:
print_error(self.peer, "TCP Error while closing tcp socket", err, verbose=self.verbosity)
return None
return False
class TCPClient(Exploit):
......@@ -93,7 +131,14 @@ class TCPClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def tcp_create(self, target=None, port=None):
def tcp_create(self, target: str=None, port: int=None) -> TCPCli:
""" Creates TCP client
:param str target: target TCP server ip address
:param int port: target TCP server port
:return TCPCli: TCP client object
"""
tcp_target = target if target else self.target
tcp_port = port if port else self.port
......
......@@ -11,7 +11,17 @@ TELNET_TIMEOUT = 30.0
class TelnetCli(object):
def __init__(self, telnet_target, telnet_port, verbosity=False):
""" Telnet Client provides methods to handle communication with Telnet server """
def __init__(self, telnet_target: str, telnet_port: int, verbosity=False) -> None:
""" Telnet client constructor
:param str telnet_target: target Telnet server ip address
:param int telnet_port: target Telnet server port
:param bool verbosity: display verbose output
:return None:
"""
self.telnet_target = telnet_target
self.telnet_port = telnet_port
self.verbosity = verbosity
......@@ -20,16 +30,29 @@ class TelnetCli(object):
self.telnet_client = None
def connect(self):
def connect(self) -> bool:
""" Connect to Telnet server
:return bool: True if connection was successful, False otherwise
"""
try:
self.telnet_client = telnetlib.Telnet(self.telnet_target, self.telnet_port, timeout=TELNET_TIMEOUT)
return self.telnet_client
return True
except Exception as err:
print_error(self.peer, "Telnet Error while connecting to the server", err, verbose=self.verbosity)
return None
return False
def login(self, username: str, password: str, retries: int=1) -> bool:
""" Login to Telnet server
:param str username: Telnet account username
:param str password: Telnet account password
:param int retries: number of authentication retries
:return bool: True if login was successful, False otherwise
"""
def login(self, username, password, retries=1):
for _ in range(retries):
try:
if not self.connect():
......@@ -45,16 +68,21 @@ class TelnetCli(object):
if i == -1 and any([x in res for x in [b"#", b"$", b">"]]) or len(res) > 500: # big banner e.g. mikrotik
print_success(self.peer, "Telnet Authentication Successful - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
return self.telnet_client
return True
else:
print_error(self.peer, "Telnet Authentication Failed - Username: '{}' Password: '{}'".format(username, password), verbose=self.verbosity)
break
except Exception as err:
print_error(self.peer, "Telnet Error while authenticating to the server", err, verbose=self.verbosity)
return None
return False
def test_connect(self) -> bool:
""" Test connection to Telnet server
:return bool: True if test connection was successful, False otherwise
"""
def test_connect(self):
try:
self.telnet_client = telnetlib.Telnet(self.telnet_target, self.telnet_port, timeout=TELNET_TIMEOUT)
self.telnet_client.expect([b"Login: ", b"login: ", b"Username: ", b"username: "], 5)
......@@ -66,10 +94,21 @@ class TelnetCli(object):
return False
def interactive(self):
def interactive(self) -> None:
""" Start interactive mode with Telnet server
:return None:
"""
self.telnet_client.interact()
def read_until(self, data):
def read_until(self, data: bytes) -> bytes:
""" Read until specified data found in response
:param bytes data: bytes until which data should be read
:return bytes: bytes read until data
"""
try:
response = self.telnet_client.read_until(data, 5)
return response
......@@ -78,21 +117,34 @@ class TelnetCli(object):
return None
def write(self, data):
def write(self, data: bytes) -> bool:
""" Write data to Telnet server
:param bytes data: data that should be written to Telnet server
:return bool: True if data was written successfuly, False otherwise
"""
try:
return self.telnet_client.write(data, 5)
self.telnet_client.write(data, 5)
return True
except Exception as err:
print_error(self.peer, "Telnet Error while writing to the server", err, verbose=self.verbosity)
return None
return False
def close(self) -> bool:
""" Close connection to Telnet server
:return bool: True if closing connection was successful, False otherwise
"""
def close(self):
try:
self.telnet_client.close()
return True
except Exception as err:
print_error(self.peer, "Telnet Error while closing connection", err, verbose=self.verbosity)
return None
return False
class TelnetClient(Exploit):
......@@ -102,7 +154,14 @@ class TelnetClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def telnet_create(self, target=None, port=None):
def telnet_create(self, target: str=None, port: int=None) -> TelnetCli:
""" Create Telnet client
:param str target: target Telnet ip address
:param int port: target Telnet port
:return TelnetCli: Telnet client object
"""
telnet_target = target if target else self.target
telnet_port = port if port else self.port
......
......@@ -12,7 +12,17 @@ UDP_SOCKET_TIMEOUT = 8.0
class UDPCli(object):
def __init__(self, udp_target, udp_port, verbosity=False):
""" UDP Client provides methods to handle communication with UDP server """
def __init__(self, udp_target: str, udp_port: int, verbosity: bool=False) -> None:
""" UDP client constructor
:param str udp_target: target UDP server ip address
:param int udp_port: target UDP server port
:param bool verbosity: display verbose output
:return None:
"""
self.udp_target = udp_target
self.udp_port = udp_port
self.verbosity = verbosity
......@@ -29,15 +39,28 @@ class UDPCli(object):
self.udp_client.settimeout(UDP_SOCKET_TIMEOUT)
def send(self, data):
def send(self, data: bytes) -> bool:
""" Send UDP data
:param bytes data: data that should be sent to the server
:return bool: True if data was sent, False otherwise
"""
try:
return self.udp_client.sendto(data, (self.udp_target, self.udp_port))
self.udp_client.sendto(data, (self.udp_target, self.udp_port))
return True
except Exception as err:
print_error(self.peer, "Error while sending data", err, verbose=self.verbosity)
return None
return False
def recv(self, num: int) -> bytes:
""" Receive UDP data
:param int num: number of bytes that should received from the server
:return bytes: bytes received from the server
"""
def recv(self, num):
try:
response = self.udp_client.recv(num)
return response
......@@ -46,13 +69,19 @@ class UDPCli(object):
return None
def close(self):
def close(self) -> bool:
""" Close UDP connection
:return bool: True if connection was closed successful, False otherwise
"""
try:
self.udp_client.close()
return True
except Exception as err:
print_error(self.peer, "Error while closing udp socket", err, verbose=self.verbosity)
return None
return False
class UDPClient(Exploit):
......@@ -62,7 +91,14 @@ class UDPClient(Exploit):
verbosity = OptBool(True, "Enable verbose output: true/false")
def udp_create(self, target=None, port=None):
def udp_create(self, target: str=None, port: int=None) -> UDPCli:
""" Create UDP client
:param str target: target UDP server ip address
:param int port: target UDP server port
:return UDPCli: UDP client object
"""
udp_target = target if target else self.target
udp_port = port if port else self.port
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment