184 lines
6.9 KiB
Python
184 lines
6.9 KiB
Python
# Author: Jan Koscielniak, (c) 2020
|
|
# Released under Gnu GPL v2.0, see LICENSE file for details
|
|
try:
|
|
import unittest2 as unittest
|
|
except ImportError:
|
|
import unittest
|
|
|
|
try:
|
|
import mock
|
|
except ImportError:
|
|
import unittest.mock as mock
|
|
|
|
import sys
|
|
from socket import inet_aton
|
|
from os.path import join, dirname, abspath
|
|
|
|
from tlsfuzzer.utils.log import Log
|
|
|
|
failed_import = False
|
|
try:
|
|
from tlsfuzzer.extract import Extract, main, help_msg
|
|
except ImportError:
|
|
failed_import = True
|
|
|
|
|
|
@unittest.skipIf(failed_import,
|
|
"Could not import extraction. Skipping related tests.")
|
|
class TestHostnameToIp(unittest.TestCase):
|
|
|
|
def test_valid_ip(self):
|
|
ip_addr = "127.0.0.1"
|
|
self.assertEqual(Extract.hostname_to_ip(ip_addr), inet_aton(ip_addr))
|
|
|
|
def test_invalid_ip(self):
|
|
invalid_ip_addr = "256.0.0.1"
|
|
with self.assertRaises(Exception):
|
|
Extract.hostname_to_ip(invalid_ip_addr)
|
|
|
|
def test_valid_hostname(self):
|
|
hostname = "localhost"
|
|
self.assertEqual(Extract.hostname_to_ip(hostname),
|
|
inet_aton("127.0.0.1"))
|
|
|
|
def test_invalid_hostname(self):
|
|
invalid_hostname = "#invalidhostname*"
|
|
with self.assertRaises(Exception):
|
|
Extract.hostname_to_ip(invalid_hostname)
|
|
|
|
|
|
@unittest.skipIf(failed_import,
|
|
"Could not import extraction. Skipping related tests.")
|
|
class TestExtraction(unittest.TestCase):
|
|
def setUp(self):
|
|
self.logfile = join(dirname(abspath(__file__)), "test.log")
|
|
log_content = "A,B\n1,0\n0,1\n1,0\n0,1\n0,1\n0,1\n0,1\n1,0\n1,0\n1,0\n"
|
|
self.expected = (
|
|
"A,B\n"
|
|
"0.000758130,0.000747009\n"
|
|
"0.000696718,0.000920462\n"
|
|
"0.000980080,0.001327954\n"
|
|
"0.000988899,0.000904547\n"
|
|
"0.000875510,0.000768453\n"
|
|
"0.000734843,0.000752226\n"
|
|
"0.000754852,0.000862102\n"
|
|
"0.000667378,0.000706491\n"
|
|
"0.000671230,0.000668237\n"
|
|
"0.000790935,0.000992733\n")
|
|
self.time_vals = "\n".join(["some random header"] +
|
|
list(str(i) for i in range(20)))
|
|
# fix mock not supporting iterators
|
|
self.mock_log = mock.mock_open(read_data=log_content)
|
|
self.mock_log.return_value.__iter__ = lambda s: iter(s.readline, '')
|
|
|
|
with mock.patch('__main__.__builtins__.open', self.mock_log):
|
|
self.log = Log(self.logfile)
|
|
self.log.read_log()
|
|
|
|
def test_extraction_from_external_time_source(self):
|
|
extract = Extract(self.log, None, "/tmp", None, None,
|
|
join(dirname(abspath(__file__)), "times-log.csv"))
|
|
extract.parse()
|
|
|
|
with mock.patch('__main__.__builtins__.open', mock.mock_open()) as mock_file:
|
|
mock_file.return_value.write.side_effect = lambda s: self.assertIn(
|
|
s.strip(), self.expected.splitlines())
|
|
extract.write_csv('timing.csv')
|
|
|
|
def test_extraction(self):
|
|
extract = Extract(self.log,
|
|
join(dirname(abspath(__file__)), "capture.pcap"),
|
|
"/tmp", "localhost", 4433)
|
|
extract.parse()
|
|
|
|
with mock.patch('__main__.__builtins__.open', mock.mock_open()) as mock_file:
|
|
mock_file.return_value.write.side_effect = lambda s: self.assertIn(
|
|
s.strip(), self.expected.splitlines())
|
|
extract.write_csv('timing.csv')
|
|
|
|
|
|
@unittest.skipIf(failed_import,
|
|
"Could not import extraction. Skipping related tests.")
|
|
class TestCommandLine(unittest.TestCase):
|
|
def test_command_line(self):
|
|
capture = "capture.pcap"
|
|
logfile = "log.csv"
|
|
host = "localhost"
|
|
port = "4433"
|
|
output = "/tmp"
|
|
args = ["extract.py",
|
|
"-l", logfile,
|
|
"-c", capture,
|
|
"-h", host,
|
|
"-p", port,
|
|
"-o", output]
|
|
mock_init = mock.Mock()
|
|
mock_init.return_value = None
|
|
with mock.patch('tlsfuzzer.extract.Extract.parse'):
|
|
with mock.patch('tlsfuzzer.extract.Extract.__init__', mock_init):
|
|
with mock.patch('tlsfuzzer.extract.Extract.write_csv'):
|
|
with mock.patch('tlsfuzzer.extract.Log') as mock_log:
|
|
with mock.patch("sys.argv", args):
|
|
main()
|
|
mock_log.assert_called_once_with(logfile)
|
|
mock_init.assert_called_once_with(
|
|
mock.ANY, capture, output, host, int(port),
|
|
None)
|
|
|
|
def test_raw_times(self):
|
|
raw_times = "times-log.csv"
|
|
logfile = "log.csv"
|
|
output = "/tmp"
|
|
args = ["extract.py",
|
|
"-l", logfile,
|
|
"-o", output,
|
|
"--raw-times", raw_times]
|
|
mock_init = mock.Mock()
|
|
mock_init.return_value = None
|
|
with mock.patch('tlsfuzzer.extract.Extract.parse'):
|
|
with mock.patch('tlsfuzzer.extract.Extract.__init__', mock_init):
|
|
with mock.patch('tlsfuzzer.extract.Extract.write_csv'):
|
|
with mock.patch('tlsfuzzer.extract.Log') as mock_log:
|
|
with mock.patch("sys.argv", args):
|
|
main()
|
|
mock_log.assert_called_once_with(logfile)
|
|
mock_init.assert_called_once_with(
|
|
mock.ANY, None, output, None, None,
|
|
raw_times)
|
|
|
|
def test_help(self):
|
|
args = ["extract.py", "--help"]
|
|
with mock.patch('tlsfuzzer.extract.help_msg') as help_mock:
|
|
with mock.patch("sys.argv", args):
|
|
self.assertRaises(SystemExit, main)
|
|
help_mock.assert_called_once()
|
|
|
|
def test_help_msg(self):
|
|
with mock.patch('__main__.__builtins__.print') as print_mock:
|
|
help_msg()
|
|
self.assertGreaterEqual(print_mock.call_count, 1)
|
|
|
|
def test_missing_output(self):
|
|
args = ["extract.py"]
|
|
with mock.patch("sys.argv", args):
|
|
self.assertRaises(SystemExit, main)
|
|
|
|
def test_incompatible_options(self):
|
|
args = ["extract.py", "-c", "capture.pcap", "--raw-times",
|
|
"times-log.csv"]
|
|
with mock.patch("sys.argv", args):
|
|
with self.assertRaises(ValueError):
|
|
main()
|
|
|
|
def test_incomplete_packet_capture_options(self):
|
|
args = ["extract.py", "-c", "capture.pcap", "-l", "log.csv",
|
|
"-o", "/tmp"]
|
|
with mock.patch("sys.argv", args):
|
|
with self.assertRaises(ValueError):
|
|
main()
|
|
|
|
def test_incomplete_ext_times_options(self):
|
|
args = ["extract.py", "--raw-times", "times-log.csv", "-o", "/tmp"]
|
|
with mock.patch("sys.argv", args):
|
|
with self.assertRaises(ValueError):
|
|
main()
|