Commit 81f13b8f by 文周繁

Initial commit

parents
No related merge requests found
import os
import sys
import logging
import subprocess
import datetime
import os
import time
import signal
import re
pattern_asan_head = re.compile(r'==\d+==ERROR: AddressSanitizer:')
# pattern_asan_shadow = re.compile(r'Shadow bytes around the buggy address:')
def search_file(dirname):
paths = []
for root, dirs, files in os.walk(dirname):
for file in files:
print(file)
if file.startswith("README"):
continue
else:
path = os.path.join(root, file)
paths.append(path)
print(len(paths))
return paths
def TIMEOUT_COMMAND(command, stdout, stderr):
"""call shell-command and either return its output or kill it
if it doesn't normally exit within timeout seconds and return None"""
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
outs, errs = process.communicate()
stdout.write(outs)
if pattern_asan_head.search(errs) is not None:
stderr.write(errs)
def generation_command(target, parameter, paths, stdout_outputfile, stderr_outputfile):
stdout_output = open(stdout_outputfile, "w+")
stderr_output = open(stderr_outputfile, "w+")
for path in paths:
command = target + " " + parameter.replace("@@", path, 1) + " "
print(command)
TIMEOUT_COMMAND(command, stdout_output, stderr_output)
def main(argv):
target = argv[0]
cmd = "@@"
dirname = argv[1]
stdout_outputfile = argv[2]
stderr_outputfile = argv[3]
print("Searching files\n")
paths = search_file(dirname)
generation_command(target, cmd, paths, stdout_outputfile, stderr_outputfile)
if __name__ == "__main__":
main(sys.argv[1:])
import os
import sys
import logging
import subprocess
import datetime
import time
import signal
def search_file(dirname):
paths = []
for root, dirs, files in os.walk(dirname):
for file in files:
print(file)
if file.startswith("README"):
continue
else:
path = os.path.join(root, file)
paths.append(path)
print(len(paths))
return paths
def TIMEOUT_COMMAND(command, fl):
"""call shell-command and either return its output or kill it
if it doesn't normally exit within timeout seconds and return None"""
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
for info in process.communicate():
fl.write(info)
def generation_command(target, parameter, paths, outputfile):
fl = open(outputfile, "w+")
for path in paths:
# print(path)
command = target + " " + parameter.replace("@@", path, 1) + " "
print(command)
ret = TIMEOUT_COMMAND(command, fl)
# 启动服务
def service_on(service, port, conf_file, outputfile):
command = service + " " + conf_file + " " + port
print(command)
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
print("service on")
return process
# 关闭服务
def service_off(process: subprocess.Popen):
process.kill()
def afl_replay(target, afl_replay_path, paths, protocol, port, conf_file, outputfile):
fl = open(outputfile, "w+")
for path in paths:
process = service_on(target, port, conf_file, outputfile)
command = afl_replay_path + " " + path + " " + protocol + " " + port
print(command)
# 使用afl-replay
subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
for info in process.communicate():
try:
fl.write(info.decode("utf-8"))
except UnicodeDecodeError:
fl.write(str(info))
service_off(process)
def main(argv):
target = argv[0] # 被测程序
conf_file = argv[1] # 被测程序配置文件
port = argv[2] # 被测程序开放端口
afl_replay_path = argv[3] # afl_replay_path
dirname = argv[4] # seeds目录
protocol = argv[5] # 协议
outputfile = argv[6] # 输出文件
print("Searching files\n")
paths = search_file(dirname)
afl_replay(target, afl_replay_path, paths, protocol, port, conf_file, outputfile) # afl-replay
if __name__ == "__main__":
main(sys.argv[1:])
import sys
import subprocess
import os
def search_file(dirname):
paths = []
for root, dirs, files in os.walk(dirname):
for file in files:
print(file)
if file.startswith("README"):
continue
else:
path = os.path.join(root, file)
paths.append(path)
print(len(paths))
return paths
def TIMEOUT_COMMAND(command, fl):
"""call shell-command and either return its output or kill it
if it doesn't normally exit within timeout seconds and return None"""
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
for info in process.communicate():
fl.write(info)
def generation_command(target, parameter, paths, outputfile):
fl = open(outputfile, "w+")
for path in paths:
command = "valgrind " + target + " " + parameter.replace("@@", path, 1) + " "
print(command)
ret = TIMEOUT_COMMAND(command, fl)
# 启动服务
def service_on(service, port, conf_file, outputfile):
command = "valgrind" + " " + service + " " + conf_file + " " + port
print(command)
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
print("service on")
return process
# 关闭服务
def service_off(process: subprocess.Popen):
process.kill()
def afl_replay(target, afl_replay_path, paths, protocol, port, conf_file, outputfile):
fl = open(outputfile, "w+")
for path in paths:
process = service_on(target, port, conf_file, outputfile)
command = afl_replay_path + " " + path + " " + protocol + " " + port
print(command)
# 使用afl-replay
subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
for info in process.communicate():
try:
fl.write(info.decode("utf-8"))
except UnicodeDecodeError:
fl.write(str(info))
service_off(process)
def main(argv):
target = argv[0] # 被测程序
conf_file = argv[1] # 被测程序配置文件
port = argv[2] # 被测程序开放端口
afl_replay_path = argv[3] # afl_replay_path
dirname = argv[4] # seeds目录
protocol = argv[5] # 协议
outputfile = argv[6] # 输出文件
print("Searching files\n")
paths = search_file(dirname)
afl_replay(target, afl_replay_path, paths, protocol, port, conf_file, outputfile) # afl-replay
if __name__ == "__main__":
main(sys.argv[1:])
import sys
import logging
import subprocess
import datetime
import os
import time
import signal
import re
import string
pattern_valgrind_head = re.compile(r'==\d+==')
pattern_valgrind_tail = re.compile(r'==\d+== ERROR SUMMARY: [1-9]+')
pattern_valgrind_at = re.compile(r'==\d+== {4}at 0x\w+: ')
pattern_valgrind_by = re.compile(r'==\d+== {4}by 0x\w+: ')
invalid_cause_dict = dict()
def search_file(dirname):
paths = []
for root, dirs, files in os.walk(dirname):
for file in files:
print(file)
if file.startswith("README"):
continue
else:
path = os.path.join(root, file)
paths.append(path)
print(len(paths))
return paths
def TIMEOUT_COMMAND(command, stdout, stderr):
"""call shell-command and either return its output or kill it
if it doesn't normally exit within timeout seconds and return None"""
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
outs, errs = process.communicate()
stdout.write(outs)
err_data = ''
is_search_at = False
search_by_count = 0
error_cause = ''
for i in errs.splitlines():
if pattern_valgrind_at.match(i) is not None and not is_search_at:
is_search_at = True
err_data = err_data + i + "\n"
_, end = pattern_valgrind_at.search(i).span()
error_cause += i[end:]
elif pattern_valgrind_by.match(i) is not None and search_by_count <= 10:
search_by_count += 1
err_data = err_data + i + "\n"
_, end = pattern_valgrind_by.search(i).span()
error_cause += i[end:]
elif pattern_valgrind_tail.match(i) is not None:
err_data = err_data + i + "\n"
if not invalid_cause_dict.has_key(error_cause):
stderr.write(err_data)
# TODO write to mangodb
invalid_cause_dict[error_cause] = 1
elif pattern_valgrind_head.match(i) is not None:
err_data = err_data + i + "\n"
else:
pass
def generation_command(target, parameter, paths, stdout_outputfile, stderr_outputfile):
stdout_output = open(stdout_outputfile, "w+")
stderr_output = open(stderr_outputfile, "w+")
for path in paths:
command = "valgrind " + target + " " + parameter.replace("@@", path, 1) + " "
print(command)
TIMEOUT_COMMAND(command, stdout_output, stderr_output)
def main(argv):
target = argv[0] # target program
cmd = "@@"
dirname = argv[1] # seeds dir
stdout_outputfile = argv[2]
stderr_outputfile = argv[3]
print("Searching files\n")
paths = search_file(dirname)
generation_command(target, cmd, paths, stdout_outputfile, stderr_outputfile)
if __name__ == "__main__":
main(sys.argv[1:])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment