820 lines
32 KiB
Python
820 lines
32 KiB
Python
from __future__ import print_function
|
|
import os
|
|
import csv
|
|
import sys
|
|
import time
|
|
import getopt
|
|
import socket
|
|
import threading
|
|
import traceback
|
|
import subprocess
|
|
try:
|
|
import queue
|
|
except ImportError:
|
|
import Queue as queue
|
|
|
|
from tlslite.handshakesettings import HandshakeSettings
|
|
from tlslite.constants import CipherSuite, ExtensionType, SignatureScheme, \
|
|
SignatureAlgorithm, HashAlgorithm, AlertDescription, AlertLevel, \
|
|
GroupName, ECPointFormat
|
|
from tlslite.extensions import TLSExtension, SNIExtension, \
|
|
RenegotiationInfoExtension, SignatureAlgorithmsExtension, \
|
|
SupportedGroupsExtension, StatusRequestExtension, \
|
|
ECPointFormatsExtension
|
|
from tlslite.utils.cryptomath import getRandomBytes
|
|
from tlslite.utils.keyfactory import parsePEMKey
|
|
from tlslite.x509 import X509
|
|
from tlslite.x509certchain import X509CertChain
|
|
from tlsfuzzer.messages import Connect, ClientHelloGenerator, \
|
|
CertificateGenerator, CertificateVerifyGenerator, \
|
|
ClientKeyExchangeGenerator, ChangeCipherSpecGenerator, \
|
|
FinishedGenerator, AlertGenerator, ResetRenegotiationInfo, \
|
|
ResetHandshakeHashes
|
|
from tlsfuzzer.expect import ExpectServerHello, ExpectCertificate, \
|
|
ExpectServerHelloDone, ExpectServerKeyExchange, \
|
|
ExpectCertificateRequest, ExpectChangeCipherSpec, ExpectFinished, \
|
|
ExpectAlert, ExpectClose
|
|
from tlsfuzzer.runner import Runner
|
|
|
|
out = queue.Queue()
|
|
|
|
def help_msg():
|
|
print("Usage <script-name> -i definitions.csv")
|
|
print("-i file name of the CSV file generated by ACTS with")
|
|
print(" definitions of all the test cases to run")
|
|
|
|
|
|
def skip_comments(f):
|
|
"""Skip the comments (lines starting with '#') in CSV file."""
|
|
while True:
|
|
pos = f.tell()
|
|
r = f.read(1)
|
|
if r == b'#':
|
|
f.readline()
|
|
continue
|
|
else:
|
|
f.seek(pos)
|
|
break
|
|
|
|
|
|
def conv_generator(conf, host, port, sni_hostname, cert=None, key=None):
|
|
"""Generate a conversation based on dict with configuration."""
|
|
|
|
root = Connect(host, port)
|
|
hs = HandshakeSettings()
|
|
# only RSA is supported
|
|
if conf['Server_authentication'] != "RSA" and \
|
|
conf['Server_authentication'] != "anon":
|
|
print("Substituting {0} to RSA for server auth"
|
|
.format(conf['Server_authentication']), file=sys.stderr)
|
|
|
|
# get the cipher that matches the imposed restrictions
|
|
cipher_trans = {"AES_128_CBC": "aes128",
|
|
"AES_256_CBC": "aes256",
|
|
"AES_128_GCM": "aes128gcm",
|
|
"AES_256_GCM": "aes256gcm",
|
|
"3DES_EDE_CBC": "3des",
|
|
"RC4": "rc4",
|
|
"Chacha20_Poly1305": "chacha20-poly1305"}
|
|
|
|
hs.cipherNames = [cipher_trans.get(conf['Cipher'], None)]
|
|
if hs.cipherNames == [None]:
|
|
raise ValueError("Unknown cipher type: {0}".format(conf['Cipher']))
|
|
|
|
mac_trans = {"AEAD": "aead",
|
|
"MD5_HMAC": "md5",
|
|
"SHA1_HMAC": "sha",
|
|
"SHA256_HMAC": "sha256",
|
|
"SHA384_HMAC": "sha384"}
|
|
hs.macNames = [mac_trans.get(conf['Integrity'], None)]
|
|
if hs.macNames == [None]:
|
|
raise ValueError("Unknown integrity type: {0}"
|
|
.format(conf['Integrity']))
|
|
if conf['Key_exchange'] == 'DHE' and \
|
|
conf['Server_authentication'] == "anon":
|
|
suites = CipherSuite.getAnonSuites(hs)
|
|
elif conf['Key_exchange'] == 'ECDHE' and \
|
|
conf['Server_authentication'] == "anon":
|
|
suites = CipherSuite.getEcdhAnonSuites(hs)
|
|
elif conf['Key_exchange'] == 'RSA':
|
|
suites = CipherSuite.getCertSuites(hs)
|
|
elif conf['Key_exchange'] == 'DHE':
|
|
suites = CipherSuite.getDheCertSuites(hs)
|
|
elif conf['Key_exchange'] == 'ECDHE':
|
|
suites = CipherSuite.getEcdheCertSuites(hs)
|
|
else:
|
|
raise ValueError("Unknown key exchange type: {0}"
|
|
.format(conf['Key_exchange']))
|
|
|
|
if not suites:
|
|
raise ValueError("Couldn't find matching cipher for {0} {3} {1} {2}"
|
|
.format(conf['Key_exchange'],
|
|
conf['Cipher'],
|
|
conf['Integrity'],
|
|
conf['Server_authentication']))
|
|
|
|
# session ID/resumption handling
|
|
if conf['CH_SessionID'] == 'random':
|
|
sess_ID = getRandomBytes(16)
|
|
elif conf['CH_SessionID'] == 'empty':
|
|
sess_ID = bytearray()
|
|
else:
|
|
raise ValueError("Unknown CH_SessionID value"
|
|
.format(conf['CH_SessionID']))
|
|
|
|
# compression
|
|
if conf['CH_compression'] == 'null_only':
|
|
compress = [0]
|
|
elif conf['CH_compression'] == 'null_and_deflate':
|
|
compress = [0, 1]
|
|
else:
|
|
raise ValueError("Unknown CH_compression value: {0}"
|
|
.format(conf['CH_compression']))
|
|
|
|
# Renegotiation Info
|
|
if conf['CH_renegotiation_info_SCSV'] == "first":
|
|
suites.insert(0, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
|
|
elif conf['CH_renegotiation_info_SCSV'] == "last":
|
|
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
|
|
elif conf['CH_renegotiation_info_SCSV'] == "absent":
|
|
pass
|
|
elif conf['CH_renegotiation_info_SCSV'] == "second":
|
|
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
|
|
suites.append(0xeaea) # GREASE
|
|
else:
|
|
raise ValueError("Unexpected CH_renegotiation_info_SCSV value: {0}"
|
|
.format(conf['CH_renegotiation_info_SCSV']))
|
|
|
|
# whether to send extensions
|
|
if conf['CH_extensions_present'] == "false":
|
|
ext = None
|
|
elif conf['CH_extensions_present'] != "true":
|
|
raise ValueError("Unexpected CH_extensions_present value: {0}"
|
|
.format(conf['CH_extensions_present']))
|
|
else:
|
|
ext = dict()
|
|
|
|
# session ticket
|
|
if conf['CH_session_ticket'] != "no_ext":
|
|
print("Not generating session ticket extension", file=sys.stderr)
|
|
|
|
# renegotiation info
|
|
if conf['CH_renegotiation_info_ext'] == "true":
|
|
ext[ExtensionType.renegotiation_info] = \
|
|
RenegotiationInfoExtension().create(bytearray())
|
|
elif conf['CH_renegotiation_info_ext'] == "false":
|
|
pass
|
|
else:
|
|
raise ValueError("Unknown option in CH_renegotiation_info_ext: {0}"
|
|
.format(conf['CH_renegotiation_info_ext']))
|
|
|
|
# signature algorithms
|
|
if conf['CH_signature_algorithms_ext'] == "false":
|
|
pass
|
|
elif conf['CH_signature_algorithms_ext'] != "true":
|
|
raise ValueError("Unknown option CH_signature_algorithms_ext: {0}"
|
|
.format(conf["CH_signature_algorithms_ext"]))
|
|
else:
|
|
sig = conf['SKE_signature_scheme']
|
|
if sig == "none" or sig == "no_message":
|
|
# enter some random ones:
|
|
ext[ExtensionType.signature_algorithms] = \
|
|
SignatureAlgorithmsExtension()\
|
|
.create([SignatureScheme.rsa_pkcs1_sha256,
|
|
SignatureScheme.rsa_pss_sha256])
|
|
else:
|
|
if "dsa" in sig:
|
|
print("Changing {0} to RSA scheme".format(sig))
|
|
sig = sig.replace("ecdsa", "rsa")
|
|
sig = sig.replace("dsa", "rsa")
|
|
|
|
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
|
|
sig = sig.replace("rsapss", "rsa_pss")
|
|
|
|
if "sha224" in sig:
|
|
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
|
|
else:
|
|
scheme = getattr(SignatureScheme, sig)
|
|
|
|
ext[ExtensionType.signature_algorithms] = \
|
|
SignatureAlgorithmsExtension()\
|
|
.create([scheme])
|
|
|
|
# supported groups extension
|
|
if conf['CH_supported_groups_ext'] == "false":
|
|
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
|
|
GroupName.x25519]
|
|
ext[ExtensionType.supported_groups] = \
|
|
SupportedGroupsExtension().create(groups)
|
|
pass
|
|
elif conf['CH_supported_groups_ext'] != "true":
|
|
raise ValueError("Unknown option in CH_supported_groups_ext: {0}"
|
|
.format(conf['CH_supported_groups_ext']))
|
|
else:
|
|
if conf['SKE_dh_group'] == "no_message":
|
|
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
|
|
GroupName.x25519]
|
|
elif conf['SKE_dh_group'] == "ffdhe1024":
|
|
groups = [GroupName.secp256r1, GroupName.x25519]
|
|
else:
|
|
groups = [getattr(GroupName, conf['SKE_dh_group'])]
|
|
|
|
ext[ExtensionType.supported_groups] = \
|
|
SupportedGroupsExtension().create(groups)
|
|
|
|
ext[ExtensionType.ec_point_formats] = \
|
|
ECPointFormatsExtension()\
|
|
.create([ECPointFormat.uncompressed,
|
|
ECPointFormat.ansiX962_compressed_char2,
|
|
ECPointFormat.ansiX962_compressed_prime])
|
|
|
|
# encrypt then MAC
|
|
if conf['CH_encrypt_then_mac_ext'] == "false":
|
|
pass
|
|
elif conf['CH_encrypt_then_mac_ext'] != "true":
|
|
raise ValueError("Unknown option in CH_encrypt_then_mac_ext: {0}"
|
|
.format(conf['CH_encrypt_then_mac_ext']))
|
|
else:
|
|
ext[ExtensionType.encrypt_then_mac] = \
|
|
TLSExtension(extType=ExtensionType.encrypt_then_mac)\
|
|
.create(bytearray(0))
|
|
|
|
# server name
|
|
if conf['CH_server_name'] == "no_ext":
|
|
pass
|
|
elif conf['CH_server_name'] == "correct":
|
|
ext[ExtensionType.server_name] = \
|
|
SNIExtension().create(sni_hostname)
|
|
elif conf['CH_server_name'] == "mismatch":
|
|
ext[ExtensionType.server_name] = \
|
|
SNIExtension().create(sni_hostname + b'.www')
|
|
else:
|
|
raise ValueError("Unknown option in CH_server_name: {0}"
|
|
.format(conf['CH_server_name']))
|
|
|
|
# OCSP staple
|
|
if conf['CH_status_request_ext'] == "false":
|
|
pass
|
|
elif conf['CH_status_request_ext'] != "true":
|
|
raise ValueError("Unknown option in CH_status_request_ext: {0}"
|
|
.format(conf['CH_status_request_ext']))
|
|
else:
|
|
ext[ExtensionType.status_request] = \
|
|
StatusRequestExtension().create()
|
|
|
|
# Extended Master Secret ext
|
|
if conf['CH_extended_master_secret_ext'] == "false":
|
|
pass
|
|
elif conf['CH_extended_master_secret_ext'] != "true":
|
|
raise ValueError(("Unknown value in CH_extended_master_secret_ext"
|
|
": {0}")
|
|
.format(conf['CH_extended_master_secret_ext']))
|
|
else:
|
|
ext[ExtensionType.extended_master_secret] = \
|
|
TLSExtension(extType=ExtensionType.extended_master_secret)\
|
|
.create(bytearray())
|
|
|
|
#
|
|
#
|
|
node = root.add_child(ClientHelloGenerator(suites,
|
|
session_id=sess_ID,
|
|
compression=compress,
|
|
extensions=ext))
|
|
|
|
if conf['CH_server_name'] == "mismatch":
|
|
node = node.add_child(ExpectAlert(AlertLevel.warning,
|
|
AlertDescription.unrecognized_name))
|
|
al_node = node
|
|
node = node.add_child(ExpectServerHello())
|
|
if conf['CH_server_name'] == "mismatch":
|
|
# make the sending of warning alert node optional
|
|
al_node.next_sibling = node
|
|
node = node.add_child(ExpectCertificate())
|
|
# TODO if conf['Certificate_Status_msg']
|
|
if conf['SKE_dh_group'] != "no_message":
|
|
node = node.add_child(ExpectServerKeyExchange())
|
|
if conf['CR_sent'] == "true":
|
|
node = node.add_child(ExpectCertificateRequest())
|
|
elif conf['CR_sent'] != "false":
|
|
raise ValueError("Unknown option in CR_sent: {0}"
|
|
.format(conf['CR_sent']))
|
|
node = node.add_child(ExpectServerHelloDone())
|
|
if conf['CR_sent'] == "true":
|
|
if conf['CV_signature_scheme'] == "no_message":
|
|
node = node.add_child(CertificateGenerator())
|
|
else:
|
|
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
|
|
node = node.add_child(ClientKeyExchangeGenerator())
|
|
|
|
if conf['CV_signature_scheme'] != "no_message":
|
|
sig = conf['CV_signature_scheme']
|
|
if "dsa" in sig:
|
|
print("Changing {0} to RSA scheme in CV".format(sig))
|
|
sig = sig.replace("ecdsa", "rsa")
|
|
sig = sig.replace("dsa", "rsa")
|
|
|
|
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
|
|
sig = sig.replace("rsapss", "rsa_pss")
|
|
if "sha224" in sig:
|
|
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
|
|
else:
|
|
scheme = getattr(SignatureScheme, sig)
|
|
node = node.add_child(CertificateVerifyGenerator(key, msg_alg=scheme))
|
|
|
|
node = node.add_child(ChangeCipherSpecGenerator())
|
|
node = node.add_child(FinishedGenerator())
|
|
node = node.add_child(ExpectChangeCipherSpec())
|
|
node = node.add_child(ExpectFinished())
|
|
if conf['Disconnect'] == "true":
|
|
node = node.add_child(AlertGenerator(AlertLevel.warning,
|
|
AlertDescription.close_notify))
|
|
node = node.add_child(ExpectAlert(AlertLevel.warning,
|
|
AlertDescription.close_notify))
|
|
node.next_sibling = ExpectClose()
|
|
node = node.add_child(node.next_sibling)
|
|
node = node.add_child(Connect(host, port))
|
|
node = node.add_child(ResetRenegotiationInfo())
|
|
node = node.add_child(ResetHandshakeHashes())
|
|
hs = HandshakeSettings()
|
|
|
|
hs.cipherNames = [cipher_trans.get(conf['H2Cipher'], None)]
|
|
if hs.cipherNames == [None]:
|
|
raise ValueError("Unknown cipher type: {0}"
|
|
.format(conf['H2Cipher']))
|
|
|
|
hs.macNames = [mac_trans.get(conf["H2Integrity"], None)]
|
|
if hs.macNames == [None]:
|
|
raise ValueError("Unknown integrity type: {0}"
|
|
.format(conf['H2Integrity']))
|
|
|
|
if conf['H2Key_exchange'] == 'DHE' and \
|
|
conf['H2Server_authentication'] == "anon":
|
|
suites = CipherSuite.getAnonSuites(hs)
|
|
elif conf['H2Key_exchange'] == "ECDHE" and \
|
|
conf['H2Server_authentication'] == "anon":
|
|
suites = CipherSuite.getEcdhAnonSuites(hs)
|
|
elif conf['H2Key_exchange'] == "RSA":
|
|
suites = CipherSuite.getCertSuites(hs)
|
|
elif conf['H2Key_exchange'] == "DHE":
|
|
suites = CipherSuite.getDheCertSuites(hs)
|
|
elif conf['H2Key_exchange'] == "ECDHE":
|
|
suites = CipherSuite.getEcdheCertSuites(hs)
|
|
else:
|
|
raise ValueError("Unknown key exchange type: {0}"
|
|
.format(conf['H2Key_exchange']))
|
|
|
|
if not suites:
|
|
raise ValueError("Couldn't find matching cipher for {0} {3} {1} {2}"
|
|
.format(conf['H2Key_exchange'],
|
|
conf['H2Cipher'],
|
|
conf['H2Integrity'],
|
|
conf['H2Server_authentication']))
|
|
|
|
if conf['H2CH_SessionID'] == 'random':
|
|
sess_ID = getRandomBytes(16)
|
|
elif conf['H2CH_SessionID'] == 'empty':
|
|
sess_ID = bytearray()
|
|
elif conf['H2CH_SessionID'] == "resume":
|
|
sess_ID = None
|
|
else:
|
|
raise ValueError("Unknown session id value: {0}"
|
|
.format(conf['H2CH_SessionID']))
|
|
|
|
# compression
|
|
if conf['H2CH_compression'] == 'null_only':
|
|
compress = [0]
|
|
elif conf['H2CH_compression'] == 'null_and_deflate':
|
|
compress = [0, 1]
|
|
else:
|
|
raise ValueError("Unknown H2CH_compression value: {0}"
|
|
.format(conf['H2CH_compression']))
|
|
|
|
# Renegotiation Info
|
|
if conf['H2CH_renegotiation_info_SCSV'] == "first":
|
|
suites.insert(0, CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
|
|
elif conf['H2CH_renegotiation_info_SCSV'] == "last":
|
|
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
|
|
elif conf['H2CH_renegotiation_info_SCSV'] == "absent":
|
|
pass
|
|
elif conf['H2CH_renegotiation_info_SCSV'] == "second":
|
|
suites.append(CipherSuite.TLS_EMPTY_RENEGOTIATION_INFO_SCSV)
|
|
suites.append(0xeaea) # GREASE
|
|
else:
|
|
raise ValueError("Unexpected H2CH_renegotiation_info_SCSV value: {0}"
|
|
.format(conf['H2CH_renegotiation_info_SCSV']))
|
|
|
|
# whether to send extensions
|
|
if conf['H2CH_extensions_present'] == "false":
|
|
ext = None
|
|
elif conf['H2CH_extensions_present'] != "true":
|
|
raise ValueError("Unexpected CH_extensions_present value: {0}"
|
|
.format(conf['H2CH_extensions_present']))
|
|
else:
|
|
ext = dict()
|
|
|
|
# session ticket
|
|
if conf['H2CH_session_ticket'] != "no_ext":
|
|
print("Not generating session ticket extension", file=sys.stderr)
|
|
|
|
# renegotiation info
|
|
if conf['H2CH_renegotiation_info_ext'] == "true":
|
|
ext[ExtensionType.renegotiation_info] = None
|
|
elif conf['H2CH_renegotiation_info_ext'] == "false":
|
|
pass
|
|
else:
|
|
raise ValueError("Unknown option in H2CH_renegotiation_info_ext: "
|
|
"{0}"
|
|
.format(conf['H2CH_renegotiation_info_ext']))
|
|
|
|
# signature algorithms
|
|
if conf['H2CH_signature_algorithms_ext'] == "false":
|
|
pass
|
|
elif conf['H2CH_signature_algorithms_ext'] != "true":
|
|
raise ValueError("Unknown option H2CH_signature_algorithms_ext: "
|
|
"{0}"
|
|
.format(conf["H2CH_signature_algorithms_ext"]))
|
|
else:
|
|
sig = conf['H2SKE_signature_scheme']
|
|
if sig == "none" or sig == "no_message":
|
|
# enter some random ones:
|
|
ext[ExtensionType.signature_algorithms] = \
|
|
SignatureAlgorithmsExtension()\
|
|
.create([SignatureScheme.rsa_pkcs1_sha256,
|
|
SignatureScheme.rsa_pss_sha256])
|
|
else:
|
|
if "dsa" in sig:
|
|
print("Changing {0} to RSA scheme".format(sig))
|
|
sig = sig.replace("ecdsa", "rsa")
|
|
sig = sig.replace("dsa", "rsa")
|
|
|
|
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
|
|
sig = sig.replace("rsapss", "rsa_pss")
|
|
|
|
if "sha224" in sig:
|
|
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
|
|
else:
|
|
scheme = getattr(SignatureScheme, sig)
|
|
|
|
ext[ExtensionType.signature_algorithms] = \
|
|
SignatureAlgorithmsExtension()\
|
|
.create([scheme])
|
|
|
|
# supported groups extension
|
|
if conf['H2CH_supported_groups_ext'] == "false":
|
|
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
|
|
GroupName.x25519]
|
|
ext[ExtensionType.supported_groups] = \
|
|
SupportedGroupsExtension().create(groups)
|
|
pass
|
|
elif conf['H2CH_supported_groups_ext'] != "true":
|
|
raise ValueError("Unknown option in H2CH_supported_groups_ext: {0}"
|
|
.format(conf['H2CH_supported_groups_ext']))
|
|
else:
|
|
if conf['H2SKE_dh_group'] == "no_message":
|
|
groups = [GroupName.ffdhe2048, GroupName.secp256r1,
|
|
GroupName.x25519]
|
|
elif conf['H2SKE_dh_group'] == "ffdhe1024":
|
|
groups = [GroupName.secp256r1, GroupName.x25519]
|
|
else:
|
|
groups = [getattr(GroupName, conf['H2SKE_dh_group'])]
|
|
|
|
ext[ExtensionType.supported_groups] = \
|
|
SupportedGroupsExtension().create(groups)
|
|
|
|
ext[ExtensionType.ec_point_formats] = \
|
|
ECPointFormatsExtension()\
|
|
.create([ECPointFormat.uncompressed,
|
|
ECPointFormat.ansiX962_compressed_char2,
|
|
ECPointFormat.ansiX962_compressed_prime])
|
|
|
|
# encrypt then MAC
|
|
if conf['H2CH_encrypt_then_mac_ext'] == "false":
|
|
pass
|
|
elif conf['H2CH_encrypt_then_mac_ext'] != "true":
|
|
raise ValueError("Unknown option in H2CH_encrypt_then_mac_ext: {0}"
|
|
.format(conf['H2CH_encrypt_then_mac_ext']))
|
|
else:
|
|
ext[ExtensionType.encrypt_then_mac] = \
|
|
TLSExtension(extType=ExtensionType.encrypt_then_mac)\
|
|
.create(bytearray(0))
|
|
|
|
# server name
|
|
if conf['H2CH_server_name'] == "no_ext":
|
|
pass
|
|
elif conf['H2CH_server_name'] == "correct":
|
|
ext[ExtensionType.server_name] = \
|
|
SNIExtension().create(sni_hostname)
|
|
elif conf['H2CH_server_name'] == "mismatch":
|
|
ext[ExtensionType.server_name] = \
|
|
SNIExtension().create(sni_hostname + b'.www')
|
|
else:
|
|
raise ValueError("Unknown option in H2CH_server_name: {0}"
|
|
.format(conf['H2CH_server_name']))
|
|
|
|
# OCSP staple
|
|
if conf['H2CH_status_request_ext'] == "false":
|
|
pass
|
|
elif conf['H2CH_status_request_ext'] != "true":
|
|
raise ValueError("Unknown option in H2CH_status_request_ext: {0}"
|
|
.format(conf['H2CH_status_request_ext']))
|
|
else:
|
|
ext[ExtensionType.status_request] = \
|
|
StatusRequestExtension().create()
|
|
|
|
# Extended Master Secret ext
|
|
if conf['H2CH_extended_master_secret_ext'] == "false":
|
|
pass
|
|
elif conf['H2CH_extended_master_secret_ext'] != "true":
|
|
raise ValueError(("Unknown value in H2CH_extended_master_secret_ext"
|
|
": {0}")
|
|
.format(conf['H2CH_extended_master_secret_ext']))
|
|
else:
|
|
ext[ExtensionType.extended_master_secret] = \
|
|
TLSExtension(extType=ExtensionType.extended_master_secret)\
|
|
.create(bytearray())
|
|
|
|
node = node.add_child(ClientHelloGenerator(suites,
|
|
session_id=sess_ID,
|
|
compression=compress,
|
|
extensions=ext))
|
|
if conf['H2CH_server_name'] == "mismatch":
|
|
node = node.add_child(ExpectAlert(AlertLevel.warning,
|
|
AlertDescription.unrecognized_name))
|
|
al_node = node
|
|
|
|
if conf['H2SH_SessionID'] == "resume":
|
|
print("doing resumption")
|
|
node = node.add_child(ExpectServerHello(resume=True))
|
|
if conf['H2CH_server_name'] == "mismatch":
|
|
# make the sending of warning alert node optional
|
|
al_node.next_sibling = node
|
|
node = node.add_child(ExpectChangeCipherSpec())
|
|
node = node.add_child(ExpectFinished())
|
|
node = node.add_child(ChangeCipherSpecGenerator())
|
|
node = node.add_child(FinishedGenerator())
|
|
node = node.add_child(AlertGenerator(AlertLevel.warning,
|
|
AlertDescription.close_notify))
|
|
node = node.add_child(ExpectAlert(AlertLevel.warning,
|
|
AlertDescription.close_notify))
|
|
node.next_sibling = ExpectClose()
|
|
else:
|
|
node = node.add_child(ExpectServerHello())
|
|
if conf['H2CH_server_name'] == "mismatch":
|
|
# make the sending of warning alert node optional
|
|
al_node.next_sibling = node
|
|
node = node.add_child(ExpectCertificate())
|
|
# TODO if conf['Certificate_Status_msg']
|
|
if conf['H2SKE_dh_group'] != "no_message":
|
|
node = node.add_child(ExpectServerKeyExchange())
|
|
if conf['H2CR_sent'] == "true":
|
|
node = node.add_child(ExpectCertificateRequest())
|
|
elif conf['H2CR_sent'] != "false":
|
|
raise ValueError("Unknown option in H2CR_sent: {0}"
|
|
.format(conf['H2CR_sent']))
|
|
node = node.add_child(ExpectServerHelloDone())
|
|
if conf['H2CR_sent'] == "true":
|
|
if conf['H2CV_signature_scheme'] == "no_message":
|
|
node = node.add_child(CertificateGenerator())
|
|
else:
|
|
node = node.add_child(CertificateGenerator(X509CertChain([cert])))
|
|
node = node.add_child(ClientKeyExchangeGenerator())
|
|
|
|
if conf['H2CV_signature_scheme'] != "no_message":
|
|
sig = conf['H2CV_signature_scheme']
|
|
if "dsa" in sig:
|
|
print("Changing {0} to RSA scheme in CV".format(sig))
|
|
sig = sig.replace("ecdsa", "rsa")
|
|
sig = sig.replace("dsa", "rsa")
|
|
|
|
sig = sig.replace("rsa_sha", "rsa_pkcs1_sha")
|
|
sig = sig.replace("rsapss", "rsa_pss")
|
|
if "sha224" in sig:
|
|
scheme = (HashAlgorithm.sha224, SignatureAlgorithm.rsa)
|
|
else:
|
|
scheme = getattr(SignatureScheme, sig)
|
|
node = node.add_child(CertificateVerifyGenerator(key, msg_alg=scheme))
|
|
|
|
node = node.add_child(ChangeCipherSpecGenerator())
|
|
node = node.add_child(FinishedGenerator())
|
|
node = node.add_child(ExpectChangeCipherSpec())
|
|
node = node.add_child(ExpectFinished())
|
|
node = node.add_child(AlertGenerator(AlertLevel.warning,
|
|
AlertDescription.close_notify))
|
|
node = node.add_child(ExpectAlert(AlertLevel.warning,
|
|
AlertDescription.close_notify))
|
|
node.next_sibling = ExpectClose()
|
|
|
|
return root
|
|
|
|
def process_stdout(name, proc):
|
|
for line in iter(proc.stdout.readline, b''):
|
|
line = line.decode()
|
|
line = line.rstrip()
|
|
out.put("{0}:stdout:{1}".format(name, line))
|
|
|
|
|
|
def process_stderr(name, proc):
|
|
for line in iter(proc.stderr.readline, b''):
|
|
line = line.decode()
|
|
line = line.rstrip()
|
|
out.put("{0}:stderr:{1}".format(name, line))
|
|
|
|
|
|
def wait_till_open(host, port):
|
|
t1 = time.time()
|
|
while time.time() - t1 < 10:
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(5)
|
|
try:
|
|
sock.connect((host, port))
|
|
except socket.error as e:
|
|
continue
|
|
break
|
|
else:
|
|
raise ValueError("Can't connect to server")
|
|
|
|
|
|
def flush_queue():
|
|
while True:
|
|
try:
|
|
line = out.get(False)
|
|
print(line)
|
|
#count_tc_passes(line)
|
|
except queue.Empty:
|
|
break
|
|
|
|
|
|
def start_server(server_cmd, server_env=tuple(), server_host=None,
|
|
server_port=4433):
|
|
if server_host is None:
|
|
server_host = "localhost"
|
|
my_env = os.environ.copy()
|
|
my_env.update(server_env)
|
|
ret = subprocess.Popen(server_cmd, env=my_env,
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
bufsize=1)
|
|
thr_stdout = threading.Thread(target=process_stdout, args=('server', ret))
|
|
thr_stdout.daemon = True
|
|
thr_stdout.start()
|
|
thr_stderr = threading.Thread(target=process_stderr, args=('server', ret))
|
|
thr_stderr.daemon = True
|
|
thr_stderr.start()
|
|
try:
|
|
wait_till_open(server_host, server_port)
|
|
except ValueError:
|
|
flush_queue()
|
|
raise
|
|
return ret, thr_stdout, thr_stderr
|
|
|
|
|
|
def server_config(conf, cert_dir):
|
|
params = ["/usr/bin/openssl", "s_server", "-www"]
|
|
|
|
key = conf['Server_key']
|
|
if "ecdsa" in key:
|
|
key = "rsa2048"
|
|
key = key.replace("dsa", "rsa")
|
|
|
|
if key == "none":
|
|
params += ["-nocert"]
|
|
else:
|
|
params += ["-key", cert_dir + '/' + key + ".key",
|
|
"-cert", cert_dir + '/' + key + ".crt"]
|
|
|
|
if conf['CR_sent'] == "true":
|
|
params += ["-verify", "1"]
|
|
|
|
dh_param = conf['SKE_dh_group']
|
|
# XXX assuming that if second handshake is also using DHE group
|
|
# that it is the same group
|
|
if "ffdhe" in dh_param:
|
|
params += ["-dhparam", cert_dir + '/' + dh_param + ".pem"]
|
|
# only really needed for OpenSSL 1.0.1, later select curve automatically
|
|
#elif dh_param != "no_message":
|
|
# if dh_param != "x25519" and dh_param != "x448":
|
|
# dh_param = dh_param.replace("secp256r1", "prime256v1")
|
|
# params += ["-named_curve", dh_param]
|
|
|
|
if conf['SH_session_ticket_ext'] == "false":
|
|
params += ["-no_ticket"]
|
|
|
|
if conf['SH_compression'] == "deflate":
|
|
params += ["-comp"]
|
|
|
|
if conf['H2CH_SessionID'] == "resume" and \
|
|
conf['H2SH_SessionID'] != "resume":
|
|
params += ["-no_cache"]
|
|
|
|
# enable legacy renego
|
|
if conf['Disconnect'] == "false" and \
|
|
(conf['CH_renegotiation_info_SCSV'] == "absent" and \
|
|
conf['CH_renegotiation_info_ext'] == "false") or \
|
|
(conf['H2CH_renegotiation_info_SCSV'] == "absent" and \
|
|
conf['H2CH_renegotiation_info_ext'] == "false"):
|
|
params += ["-legacy_renegotiation"]
|
|
|
|
# TODO OCSP stapling
|
|
|
|
return params
|
|
|
|
|
|
argv = sys.argv[1:]
|
|
|
|
test_data = None
|
|
cert = None
|
|
key = None
|
|
run_only = set()
|
|
cert_dir = "."
|
|
|
|
opts, args = getopt.getopt(argv, "hi:k:c:D:")
|
|
for opt, arg in opts:
|
|
if opt == "-h":
|
|
help_msg()
|
|
os.exit(0)
|
|
elif opt == "-i":
|
|
test_data = arg
|
|
elif opt == "-k":
|
|
text_key = open(arg, 'rb').read()
|
|
if sys.version_info[0] >= 3:
|
|
text_key = str(text_key, 'utf-8')
|
|
key = parsePEMKey(text_key, private=True)
|
|
elif opt == "-c":
|
|
text_cert = open(arg, 'rb').read()
|
|
if sys.version_info[0] >= 3:
|
|
text_cert = str(text_cert, 'utf-8')
|
|
cert = X509()
|
|
cert.parse(text_cert)
|
|
elif opt == "-D":
|
|
cert_dir = arg
|
|
else:
|
|
raise ValueError("Unexpected option: {0}".format(opt))
|
|
|
|
if args:
|
|
for arg in args:
|
|
run_only.add(int(arg))
|
|
|
|
if test_data is None:
|
|
raise ValueError("No file specified")
|
|
|
|
with open(test_data) as f:
|
|
skip_comments(f)
|
|
reader = csv.DictReader(f)
|
|
|
|
good = 0
|
|
bad = 0
|
|
skip = 0
|
|
failed = []
|
|
skipped = []
|
|
|
|
|
|
for i, row in enumerate(reader):
|
|
if run_only and i not in run_only:
|
|
continue
|
|
print("Processing TC#: {0}".format(i))
|
|
conv = conv_generator(row, 'localhost', 4433, 'localhost', cert, key)
|
|
if conv is None:
|
|
skip += 1
|
|
skipped.append(i)
|
|
continue
|
|
|
|
print("starting server...")
|
|
try:
|
|
params = server_config(row, cert_dir)
|
|
srv, srv_out, srv_err = start_server(params, [], "localhost", 4433)
|
|
|
|
print("...")
|
|
runner = Runner(conv)
|
|
res = True
|
|
try:
|
|
runner.run()
|
|
except Exception:
|
|
print("Error while processing")
|
|
print(traceback.format_exc())
|
|
res = False
|
|
|
|
srv.send_signal(15)
|
|
srv.wait()
|
|
print("Server process killed: {0}".format(srv.returncode))
|
|
|
|
srv_out.join()
|
|
srv_err.join()
|
|
flush_queue()
|
|
|
|
if res:
|
|
good += 1
|
|
print("OK\n")
|
|
else:
|
|
bad += 1
|
|
failed.append(i)
|
|
print(sorted(row.items()))
|
|
finally:
|
|
flush_queue()
|
|
print()
|
|
|
|
|
|
print("Test end")
|
|
print("successful: {0}".format(good))
|
|
print("skipped: {0}".format(skip))
|
|
print(" {0}".format("\n ".join(str(i) for i in skipped)))
|
|
print("failed: {0}".format(bad))
|
|
print(" {0}".format("\n ".join(str(i) for i in failed)))
|