#!/usr/bin/env python # Authors: # Trevor Perrin # Kees Bos - Added tests for XML-RPC # Dimitris Moraitis - Anon ciphersuites # Marcelo Fernandez - Added test for NPN # Martin von Loewis - python 3 port # Hubert Kario - several improvements # Google - FALLBACK_SCSV test # Efthimis Iosifidis - improvemnts of time measurement in Throughput Test # # # See the LICENSE file for legal information regarding use of this file. from __future__ import print_function import sys import os import os.path import socket import time import timeit import getopt from tempfile import mkstemp try: from BaseHTTPServer import HTTPServer from SimpleHTTPServer import SimpleHTTPRequestHandler except ImportError: from http.server import HTTPServer, SimpleHTTPRequestHandler from tlslite import TLSConnection, Fault, HandshakeSettings, \ X509, X509CertChain, IMAP4_TLS, VerifierDB, Session, SessionCache, \ parsePEMKey, constants, \ AlertDescription, HTTPTLSConnection, TLSSocketServerMixIn, \ POP3_TLS, m2cryptoLoaded, pycryptoLoaded, gmpyLoaded, tackpyLoaded, \ Checker, __version__ from tlslite.handshakesettings import VirtualHost, Keypair from tlslite.errors import * from tlslite.utils.cryptomath import prngName, getRandomBytes try: import xmlrpclib except ImportError: # Python 3 from xmlrpc import client as xmlrpclib import ssl from tlslite import * from tlslite.constants import KeyUpdateMessageType try: from tack.structures.Tack import Tack except ImportError: pass def printUsage(s=None): if m2cryptoLoaded: crypto = "M2Crypto/OpenSSL" else: crypto = "Python crypto" if s: print("ERROR: %s" % s) print("""\ntls.py version %s (using %s) Commands: server HOST:PORT DIRECTORY client HOST:PORT DIRECTORY """ % (__version__, crypto)) sys.exit(-1) def testConnClient(conn): b1 = os.urandom(1) b10 = os.urandom(10) b100 = os.urandom(100) b1000 = os.urandom(1000) conn.write(b1) conn.write(b10) conn.write(b100) conn.write(b1000) r1 = conn.read(min=1, max=1) assert len(r1) == 1 assert r1 == b1 r10 = conn.read(min=10, max=10) assert len(r10) == 10 assert r10 == b10 r100 = conn.read(min=100, max=100) assert len(r100) == 100 assert r100 == b100 r1000 = conn.read(min=1000, max=1000) assert len(r1000) == 1000 assert r1000 == b1000 def clientTestCmd(argv): address = argv[0] dir = argv[1] #Split address into hostname/port tuple address = address.split(":") address = ( address[0], int(address[1]) ) #open synchronisation FIFO synchro = socket.socket(socket.AF_INET, socket.SOCK_STREAM) synchro.settimeout(60) synchro.connect((address[0], address[1]-1)) def connect(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(15) sock.connect(address) c = TLSConnection(sock) return c test_no = 0 badFault = False print("Test {0} - anonymous handshake".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientAnonymous(settings=settings) testConnClient(connection) connection.close() test_no += 1 print("Test {0} - good X.509 (plus SNI)".format(test_no)) synchro.recv(1) connection = connect() connection.handshakeClientCert(serverName=address[0]) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites) assert(connection.encryptThenMAC == False) assert connection.session.appProto is None connection.close() test_no += 1 print("Test {0} - good X.509 TLSv1.2 (plus ALPN)".format(test_no)) synchro.recv(1) settings = HandshakeSettings() settings.maxVersion = (3, 3) connection = connect() connection.handshakeClientCert(serverName=address[0], alpn=[b'http/1.1'], settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverName == address[0] assert connection.session.cipherSuite in constants.CipherSuite.aeadSuites assert connection.encryptThenMAC == False assert connection.session.appProto == b'http/1.1' connection.close() test_no += 1 print("Test {0} - good X.509 TLSv1.3 (plus ALPN)".format(test_no)) synchro.recv(1) connection = connect() connection.handshakeClientCert(serverName=address[0], alpn=[b'http/1.1']) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverName == address[0] assert connection.session.cipherSuite in constants.CipherSuite.aeadSuites assert connection.encryptThenMAC == False assert connection.session.appProto == b'http/1.1' connection.close() test_no += 1 print("Test {0} - good X.509/w RSA-PSS sig".format(test_no)) synchro.recv(1) connection = connect() connection.handshakeClientCert(serverName=address[0]) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites) assert(connection.encryptThenMAC == False) connection.close() test_no += 1 print("Test {0} - good X.509/w RSA-PSS cert".format(test_no)) synchro.recv(1) connection = connect() connection.handshakeClientCert(serverName=address[0]) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites) assert(connection.encryptThenMAC == False) connection.close() test_no += 1 print("Test {0} - good X.509/w RSA-PSS cert in TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeClientCert(serverName=address[0], settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(connection.session.cipherSuite in constants.CipherSuite.aeadSuites) assert(connection.encryptThenMAC == False) connection.close() test_no += 1 print("Test {0} - good X.509, small record_size_limit".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.record_size_limit = 64 connection.handshakeClientCert(settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good X.509, SSLv3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 0) settings.maxVersion = (3, 0) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.ecdheEcdsaSuites assert isinstance(connection.session.serverCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 1) settings.maxVersion = (3, 1) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.ecdheEcdsaSuites assert isinstance(connection.session.serverCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.ecdheEcdsaSuites assert isinstance(connection.session.serverCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.eccCurves = ["secp384r1"] settings.keyShares = [] try: connection.handshakeClientCert(settings=settings) assert False except TLSRemoteAlert as e: assert "handshake_failure" in str(e) connection.close() test_no += 1 for curve, keySize in (("brainpoolP256r1", 256), ("brainpoolP384r1", 384), ("brainpoolP512r1", 512)): print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2".format(test_no, curve)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.eccCurves = [curve] settings.keyShares = [] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \ == keySize connection.close() test_no += 1 print("Test {0} - Two good ECDSA certs - secp256r1, TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.eccCurves = ["secp256r1"] settings.keyShares = [] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \ == 256 connection.close() test_no += 1 print("Test {0} - Two good ECDSA certs - secp384r1, TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.eccCurves = ["secp384r1"] settings.keyShares = [] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \ == 384 connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, TLSv1.2" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.rsaSigHashes = ["sha256"] settings.ecdsaSigHashes = ["sha256"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "rsa" assert connection.version == (3, 3) connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, TLSv1.2" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.rsaSigHashes = ["sha384"] settings.ecdsaSigHashes = ["sha256"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "ecdsa" assert connection.version == (3, 3) connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, TLSv1.2" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.rsaSigHashes = ["sha384"] settings.ecdsaSigHashes = ["sha384"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "rsa" assert connection.version == (3, 3) connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, TLSv1.3" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) settings.rsaSigHashes = ["sha256"] settings.ecdsaSigHashes = ["sha256"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "rsa" assert connection.version == (3, 4) connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, TLSv1.3" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) settings.rsaSigHashes = ["sha384"] settings.ecdsaSigHashes = ["sha256"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "ecdsa" assert connection.version == (3, 4) connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, TLSv1.3" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) settings.rsaSigHashes = ["sha384"] settings.ecdsaSigHashes = ["sha384"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "rsa" assert connection.version == (3, 4) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.tls13Suites assert isinstance(connection.session.serverCertChain, X509CertChain) assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \ == 256 connection.close() test_no += 1 print("Test {0} - mismatched ECDSA curve, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) settings.ecdsaSigHashes = ["sha384", "sha512"] try: connection.handshakeClientCert(settings=settings) assert False except TLSRemoteAlert as e: assert "handshake_failure" in str(e) connection.close() test_no += 1 print("Test {0} - good X.509 P-384 ECDSA, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.tls13Suites assert isinstance(connection.session.serverCertChain, X509CertChain) assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \ == 384 connection.close() test_no += 1 print("Test {0} - good X.509 P-521 ECDSA, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.tls13Suites assert isinstance(connection.session.serverCertChain, X509CertChain) assert len(connection.session.serverCertChain.getEndEntityPublicKey()) \ == 521 connection.close() test_no += 1 print("Test {0} - good X.509 Ed25519, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.tls13Suites assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good X.509 Ed448, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.tls13Suites assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \ == "Ed448" connection.close() test_no += 1 print("Test {0} - good RSA and ECDSA, TLSv1.3, rsa" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.tls13Suites assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "rsa" assert connection.version == (3, 4) connection.close() test_no += 1 print("Test {0} - good RSA and ECDSA, TLSv1.3, ecdsa" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) settings.rsaSigHashes = [] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.tls13Suites assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "ecdsa" assert connection.version == (3, 4) connection.close() test_no += 1 print("Test {0} - good RSA and ECDSA, TLSv1.2, rsa" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.ecdheCertSuites, connection.session.cipherSuite assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "rsa" assert connection.version == (3, 3) connection.close() test_no += 1 print("Test {0} - good RSA and ECDSA, TLSv1.2, ecdsa" .format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.rsaSigHashes = [] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.ecdheEcdsaSuites, connection.session.cipherSuite assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "ecdsa" assert connection.version == (3, 3) connection.close() test_no += 1 print("Test {0} - good X.509, mismatched key_share".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.keyShares = ["x25519"] connection.handshakeClientCert(settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good X.509, RC4-MD5".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.macNames = ["md5"] settings.cipherNames = ["rc4"] settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.cipherSuite == constants.CipherSuite.TLS_RSA_WITH_RC4_128_MD5) assert(connection.encryptThenMAC == False) connection.close() if tackpyLoaded: settings = HandshakeSettings() settings.useExperimentalTackExtension = True settings.maxVersion = (3, 3) test_no += 1 print("Test {0} - good X.509, TACK".format(test_no)) synchro.recv(1) connection = connect() connection.handshakeClientCert(settings=settings) assert(connection.session.tackExt.tacks[0].getTackId() == "5lcbe.eyweo.yxuan.rw6xd.jtoz7") assert(connection.session.tackExt.activation_flags == 1) testConnClient(connection) connection.close() test_no += 1 print("Test {0} - good X.509, TACK unrelated to cert chain".\ format(test_no)) synchro.recv(1) connection = connect() try: connection.handshakeClientCert(settings=settings) assert False except TLSLocalAlert as alert: if alert.description != AlertDescription.illegal_parameter: raise connection.close() else: test_no += 1 print("Test {0} - good X.509, TACK...skipped (no tackpy)".\ format(test_no)) test_no += 1 print("Test {0} - good X.509, TACK unrelated to cert chain...skipped" " (no tackpy)".\ format(test_no)) test_no += 1 print("Test {0} - good PSK".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')] connection.handshakeClientCert(settings=settings) assert connection.session.serverCertChain is None assert connection.ecdhCurve is not None testConnClient(connection) connection.close() test_no += 1 print("Test {0} - good PSK, no DH".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.psk_modes = ["psk_ke"] settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')] connection.handshakeClientCert(settings=settings) assert connection.session.serverCertChain is None assert connection.ecdhCurve is None testConnClient(connection) connection.close() test_no += 1 print("Test {0} - good PSK, no DH, no cert".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.psk_modes = ["psk_ke"] settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')] connection.handshakeClientCert(settings=settings) assert connection.session.serverCertChain is None assert connection.ecdhCurve is None testConnClient(connection) connection.close() test_no += 1 print("Test {0} - good SRP (db)".format(test_no)) print("client {0} - waiting for synchro".format(time.time())) try: synchro.recv(1) except Exception: print("client {0} - wait abort".format(time.time())) raise print("client {0} - synchro received".format(time.time())) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientSRP("test", "password", settings=settings) testConnClient(connection) connection.close() test_no += 1 print("Test {0} - good SRP".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientSRP("test", "password", settings=settings) testConnClient(connection) connection.close() test_no += 1 print("Test {0} - SRP faults".format(test_no)) for fault in Fault.clientSrpFaults + Fault.genericFaults: synchro.recv(1) connection = connect() connection.fault = fault settings = HandshakeSettings() settings.maxVersion = (3, 3) try: connection.handshakeClientSRP("test", "password", settings=settings) print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True test_no += 1 print("Test {0} - good SRP: with X.509 certificate, TLSv1.0".format(test_no)) settings = HandshakeSettings() settings.minVersion = (3,1) settings.maxVersion = (3,1) synchro.recv(1) connection = connect() connection.handshakeClientSRP("test", "password", settings=settings) assert(isinstance(connection.session.serverCertChain, X509CertChain)) testConnClient(connection) connection.close() test_no += 1 print("Test {0} - X.509 with SRP faults".format(test_no)) for fault in Fault.clientSrpFaults + Fault.genericFaults: synchro.recv(1) connection = connect() connection.fault = fault settings = HandshakeSettings() settings.maxVersion = (3, 3) try: connection.handshakeClientSRP("test", "password", settings=settings) print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True test_no += 1 print("Test {0} - X.509 faults".format(test_no)) for fault in Fault.clientNoAuthFaults + Fault.genericFaults: synchro.recv(1) connection = connect() connection.fault = fault try: connection.handshakeClientCert() print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True test_no += 1 print("Test {0} - good mutual X.509".format(test_no)) x509Cert = X509().parse(open(os.path.join(dir, "clientX509Cert.pem")).read()) x509Chain = X509CertChain([x509Cert]) s = open(os.path.join(dir, "clientX509Key.pem")).read() x509Key = parsePEMKey(s, private=True) synchro.recv(1) connection = connect() connection.handshakeClientCert(x509Chain, x509Key) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good mutual ECDSA X.509".format(test_no)) with open(os.path.join(dir, "clientECCert.pem")) as f: x509Cert = X509().parse(f.read()) x509Chain = X509CertChain([x509Cert]) with open(os.path.join(dir, "clientECKey.pem")) as f: x509Key = parsePEMKey(f.read(), private=True) synchro.recv(1) connection = connect() connection.handshakeClientCert(x509Chain, x509Key) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert len(connection.session.serverCertChain.getEndEntityPublicKey()) ==\ 256 connection.close() test_no += 1 print("Test {0} - good mutual Ed25519 X.509".format(test_no)) with open(os.path.join(dir, "clientEd25519Cert.pem")) as f: x509EdCert = X509().parse(f.read()) x509EdChain = X509CertChain([x509EdCert]) with open(os.path.join(dir, "clientEd25519Key.pem")) as f: x509EdKey = parsePEMKey(f.read(), private=True) synchro.recv(1) connection = connect() connection.handshakeClientCert(x509EdChain, x509EdKey) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good mutual Ed25519 X.509, TLS 1.2".format(test_no)) with open(os.path.join(dir, "clientEd25519Cert.pem")) as f: x509EdCert = X509().parse(f.read()) x509EdChain = X509CertChain([x509EdCert]) with open(os.path.join(dir, "clientEd25519Key.pem")) as f: x509EdKey = parsePEMKey(f.read(), private=True) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeClientCert(x509EdChain, x509EdKey, settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good X.509 DSA, SSLv3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 0) settings.maxVersion = (3, 0) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.dheDsaSuites assert isinstance(connection.session.serverCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good X.509 DSA, TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.dheDsaSuites assert isinstance(connection.session.serverCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good X.509 Ed25519, TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.ecdheEcdsaSuites assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good X.509 Ed448, TLSv1.2".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert connection.session.cipherSuite in\ constants.CipherSuite.ecdheEcdsaSuites assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type \ == "Ed448" connection.close() test_no += 1 print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3,4) settings.maxVersion = (3,4) connection.handshakeClientCert(settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good mutual X.509, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3,4) settings.maxVersion = (3,4) connection.handshakeClientCert(x509Chain, x509Key, settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good mutual X.509, PHA, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(x509Chain, x509Key, settings=settings) synchro.recv(1) b = connection.read(0, 0) assert b == b'' testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good mutual X.509 Ed25519, PHA, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(x509EdChain, x509EdKey, settings=settings) synchro.recv(1) b = connection.read(0, 0) assert b == b'' testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverCertChain.getEndEntityPublicKey().key_type\ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good mutual X.509, PHA and KeyUpdate, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(x509Chain, x509Key, settings=settings) for result in connection.send_keyupdate_request( KeyUpdateMessageType.update_requested): assert result in (0, 1) synchro.recv(1) b = connection.read(0, 0) assert b == b'' testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - mutual X.509, PHA, no client cert, TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeClientCert(X509CertChain(), x509Key, settings=settings) synchro.recv(1) b = connection.read(0, 0) assert b == b'' try: connection.read(0, 0) assert False except TLSRemoteAlert as e: assert e.description == AlertDescription.certificate_required assert "certificate_required" in str(e), str(e) connection.close() test_no += 1 print("Test {0} - good mutual X.509, TLSv1.1".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3,2) settings.maxVersion = (3,2) connection.handshakeClientCert(x509Chain, x509Key, settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good mutual X.509, SSLv3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeClientCert(x509Chain, x509Key, settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - mutual X.509 faults".format(test_no)) for fault in Fault.clientCertFaults + Fault.genericFaults: synchro.recv(1) connection = connect() connection.fault = fault try: connection.handshakeClientCert(x509Chain, x509Key) print(" Good Fault %s" % (Fault.faultNames[fault])) except TLSFaultError as e: print(" BAD FAULT %s: %s" % (Fault.faultNames[fault], str(e))) badFault = True test_no += 1 print("Test {0} - good SRP, prepare to resume... (plus SNI)".\ format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientSRP("test", "password", serverName=address[0], settings=settings) testConnClient(connection) connection.close() session = connection.session test_no += 1 print("Test {0} - resumption (plus SNI)".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientSRP("test", "garbage", serverName=address[0], session=session, settings=settings) testConnClient(connection) #Don't close! -- see below test_no += 1 print("Test {0} - invalidated resumption (plus SNI)".format(test_no)) synchro.recv(1) connection.sock.close() #Close the socket without a close_notify! synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) try: connection.handshakeClientSRP("test", "garbage", serverName=address[0], session=session, settings=settings) assert False except TLSRemoteAlert as alert: if alert.description != AlertDescription.bad_record_mac: raise connection.close() test_no += 1 print("Test {0} - HTTPS test X.509".format(test_no)) address = address[0], address[1]+1 if hasattr(socket, "timeout"): timeoutEx = socket.timeout else: timeoutEx = socket.error while 1: try: htmlBody = bytearray(open(os.path.join(dir, "index.html")).read(), "utf-8") fingerprint = None for y in range(2): checker =Checker(x509Fingerprint=fingerprint) h = HTTPTLSConnection(\ address[0], address[1], checker=checker) for x in range(3): synchro.recv(1) h.request("GET", "/index.html") r = h.getresponse() assert(r.status == 200) b = bytearray(r.read()) assert(b == htmlBody) fingerprint = h.tlsSession.serverCertChain.getFingerprint() assert(fingerprint) break except timeoutEx: print("timeout, retrying...") pass address = address[0], address[1]+1 implementations = [] if m2cryptoLoaded: implementations.append("openssl") if pycryptoLoaded: implementations.append("pycrypto") implementations.append("python") test_no += 1 print("Test {0} - different ciphers, TLSv1.0".format(test_no)) for implementation in implementations: for cipher in ["aes128", "aes256", "rc4"]: test_no += 1 print("Test {0}:".format(test_no), end=' ') synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] settings.minVersion = (3,1) settings.maxVersion = (3,1) connection.handshakeClientCert(settings=settings) testConnClient(connection) print("%s %s" % (connection.getCipherName(), connection.getCipherImplementation())) connection.close() test_no += 1 print("Test {0} - throughput test".format(test_no)) for implementation in implementations: for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8", "aes128gcm", "aes256gcm", "aes128", "aes256", "3des", "rc4", "chacha20-poly1305_draft00", "chacha20-poly1305"]: # skip tests with implementations that don't support them if cipher == "3des" and implementation not in ("openssl", "pycrypto"): continue if cipher in ("aes128gcm", "aes256gcm") and \ implementation not in ("pycrypto", "python", "openssl"): continue if cipher in ("aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8") and \ implementation not in ("python", "openssl"): continue if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \ and implementation not in ("python", ): continue test_no += 1 print("Test {0}:".format(test_no), end=' ') synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] if cipher not in ("aes128ccm", "aes128ccm_8", "aes128gcm", "aes256gcm", "chacha20-poly1305"): settings.maxVersion = (3, 3) connection.handshakeClientCert(settings=settings) print("%s %s:" % (connection.getCipherName(), connection.getCipherImplementation()), end=' ') startTime = timeit.default_timer() connection.write(b"hello"*10000) h = connection.read(min=50000, max=50000) stopTime = timeit.default_timer() sizeofdata = len(h)*2 if stopTime-startTime: print("100K exchanged at rate of %d bytes/sec" % int(sizeofdata/(stopTime-startTime))) else: print("100K exchanged very fast") assert(h == b"hello"*10000) connection.close() test_no += 1 print("Test {0} - Next-Protocol Client Negotiation".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(nextProtos=[b"http/1.1"], settings=settings) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'http/1.1') connection.close() test_no += 1 print("Test {0} - Next-Protocol Client Negotiation".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"], settings=settings) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() test_no += 1 print("Test {0} - Next-Protocol Client Negotiation".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"], settings=settings) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() test_no += 1 print("Test {0} - Next-Protocol Client Negotiation".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"], settings=settings) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() test_no += 1 print("Test {0} - Next-Protocol Client Negotiation".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(nextProtos=[b"spdy/3", b"spdy/2", b"http/1.1"], settings=settings) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/3') connection.close() test_no += 1 print("Test {0} - Next-Protocol Client Negotiation".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(nextProtos=[b"http/1.1"], settings=settings) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'http/1.1') connection.close() test_no += 1 print("Test {0} - Next-Protocol Client Negotiation".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(nextProtos=[b"spdy/2", b"http/1.1"], settings=settings) #print(" Next-Protocol Negotiated: %s" % connection.next_proto) assert(connection.next_proto == b'spdy/2') connection.close() test_no += 1 print("Test {0} - FALLBACK_SCSV".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.sendFallbackSCSV = True settings.maxVersion = (3, 3) # TODO fix FALLBACK_SCSV with TLS 1.3 connection.handshakeClientCert(settings=settings) testConnClient(connection) connection.close() test_no += 1 print("Test {0} - FALLBACK_SCSV".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.sendFallbackSCSV = True settings.maxVersion = (3, 2) try: connection.handshakeClientCert(settings=settings) assert False except TLSRemoteAlert as alert: if alert.description != AlertDescription.inappropriate_fallback: raise connection.close() test_no += 1 print("Test {0} - no EtM server side".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.macNames.remove("aead") settings.maxVersion = (3, 3) assert(settings.useEncryptThenMAC) connection.handshakeClientCert(serverName=address[0], settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(not connection.encryptThenMAC) connection.close() test_no += 1 print("Test {0} - no EtM client side".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.macNames.remove("aead") settings.useEncryptThenMAC = False settings.maxVersion = (3, 3) connection.handshakeClientCert(serverName=address[0], settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(not connection.encryptThenMAC) connection.close() test_no += 1 print("Test {0} - resumption with EtM".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.macNames.remove("aead") settings.maxVersion = (3, 3) connection.handshakeClientCert(serverName=address[0], settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(not connection.resumed) assert(connection.encryptThenMAC) connection.close() session = connection.session # resume synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) connection.handshakeClientCert(serverName=address[0], session=session, settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(connection.resumed) assert(connection.encryptThenMAC) connection.close() test_no += 1 print("Test {0} - resumption with no EtM in 2nd handshake".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.macNames.remove("aead") settings.maxVersion = (3, 3) connection.handshakeClientCert(serverName=address[0], settings=settings) testConnClient(connection) assert(isinstance(connection.session.serverCertChain, X509CertChain)) assert(connection.session.serverName == address[0]) assert(not connection.resumed) assert(connection.encryptThenMAC) connection.close() session = connection.session # resume synchro.recv(1) settings = HandshakeSettings() settings.useEncryptThenMAC = False settings.macNames.remove("aead") settings.maxVersion = (3, 3) connection = connect() try: connection.handshakeClientCert(serverName=address[0], session=session, settings=settings) assert False except TLSRemoteAlert as e: assert(str(e) == "illegal_parameter") else: raise AssertionError("No exception raised") connection.close() test_no += 1 print("Test {0} - resumption in TLSv1.3".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() # force HRR settings.keyShares = [] connection.handshakeClientCert(serverName=address[0], settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverName == address[0] assert not connection.resumed assert connection.session.tickets connection.close() session = connection.session # resume synchro.recv(1) settings = HandshakeSettings() settings.keyShares = [] connection = connect() connection.handshakeClientCert(serverName=address[0], session=session, settings=settings) testConnClient(connection) assert connection.resumed connection.close() test_no += 1 print("Test {0} - resumption in TLSv1.3 with mutual X.509".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3,4) # force HRR settings.keyShares = [] connection.handshakeClientCert(x509Chain, x509Key, serverName=address[0], settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverName == address[0] assert not connection.resumed assert connection.session.tickets connection.close() session = connection.session # resume synchro.recv(1) settings = HandshakeSettings() settings.minVersion = (3,4) settings.keyShares = [] connection = connect() connection.handshakeClientCert(x509Chain, x509Key, serverName=address[0], session=session, settings=settings) testConnClient(connection) assert connection.resumed connection.close() test_no += 1 print("Test {0} - resumption in TLSv1.3 with AES-CCM tickets".format(test_no)) synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) # force HRR settings.keyShares = [] connection.handshakeClientCert(serverName=address[0], settings=settings) testConnClient(connection) assert isinstance(connection.session.serverCertChain, X509CertChain) assert connection.session.serverName == address[0] assert not connection.resumed assert connection.session.tickets connection.close() session = connection.session # resume synchro.recv(1) settings = HandshakeSettings() settings.minVersion = (3, 4) settings.keyShares = [] connection = connect() connection.handshakeClientCert(serverName=address[0], session=session, settings=settings) testConnClient(connection) assert connection.resumed connection.close() test_no += 1 print("Test {0} - Heartbeat extension response callback in TLSv1.2".format(test_no)) heartbeat_payload = os.urandom(50) def heartbeat_response_check(message): global received_payload received_payload = message.payload synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) settings.heartbeat_response_callback = heartbeat_response_check connection.handshakeClientCert(serverName=address[0], settings=settings) connection.send_heartbeat_request(heartbeat_payload, 16) testConnClient(connection) testConnClient(connection) connection.close() assert heartbeat_payload == received_payload test_no += 1 print("Test {0} - Heartbeat extension in TLSv1.3".format(test_no)) heartbeat_payload = os.urandom(50) def heartbeat_response_check(message): global received_payload received_payload = message.payload synchro.recv(1) connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) settings.heartbeat_response_callback = heartbeat_response_check connection.handshakeClientCert(serverName=address[0], settings=settings) connection.send_heartbeat_request(heartbeat_payload, 16) testConnClient(connection) testConnClient(connection) connection.close() assert heartbeat_payload == received_payload test_no += 1 print("Test {0} - KeyUpdate from client in TLSv1.3".format(test_no)) assert synchro.recv(1) == b'R' connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) connection.handshakeClientCert(serverName=address[0], settings=settings) assert synchro.recv(1) == b'K' for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested): assert i in (0, 1) assert synchro.recv(1) == b'K' testConnClient(connection) connection.close() test_no += 1 print("Test {0} - mutual KeyUpdates in TLSv1.3".format(test_no)) assert synchro.recv(1) == b'R' connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) connection.handshakeClientCert(serverName=address[0], settings=settings) for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested): assert i in (0, 1) testConnClient(connection) synchro.send(b'R') connection.close() test_no += 1 print("Test {0} - multiple mutual KeyUpdates in TLSv1.3".format(test_no)) assert synchro.recv(1) == b'R' connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) connection.handshakeClientCert(serverName=address[0], settings=settings) for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested): assert i in (0, 1) for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested): assert i in (0, 1) testConnClient(connection) synchro.send(b'R') connection.close() test_no += 1 print('Test {0} - good standard XMLRPC https client'.format(test_no)) address = address[0], address[1]+1 synchro.recv(1) try: # python 2.7.9 introduced certificate verification (context option) # python 3.4.2 doesn't have it though context = ssl.create_default_context(\ cafile=os.path.join(dir, "serverX509Cert.pem")) server = xmlrpclib.Server('https://%s:%s' % address, context=context) except (TypeError, AttributeError): server = xmlrpclib.Server('https://%s:%s' % address) synchro.recv(1) assert server.add(1,2) == 3 synchro.recv(1) assert server.pow(2,4) == 16 test_no += 1 print('Test {0} - good tlslite XMLRPC client'.format(test_no)) transport = XMLRPCTransport(ignoreAbruptClose=True) server = xmlrpclib.Server('https://%s:%s' % address, transport) synchro.recv(1) assert server.add(1,2) == 3 synchro.recv(1) assert server.pow(2,4) == 16 test_no += 1 print('Test {0} - good XMLRPC ignored protocol'.format(test_no)) server = xmlrpclib.Server('http://%s:%s' % address, transport) synchro.recv(1) assert server.add(1,2) == 3 synchro.recv(1) assert server.pow(2,4) == 16 test_no += 1 print("Test {0} - Internet servers test".format(test_no)) try: i = IMAP4_TLS("cyrus.andrew.cmu.edu") i.login("anonymous", "anonymous@anonymous.net") i.logout() test_no += 1 print("Test {0}: IMAP4 good".format(test_no)) p = POP3_TLS("pop.gmail.com") p.quit() test_no += 1 print("Test {0}: POP3 good".format(test_no)) except (socket.error, socket.timeout) as e: print("Non-critical error: socket error trying to reach internet " "server: ", e) synchro.close() if not badFault: print("Test succeeded, {0} good".format(test_no)) else: print("Test failed") def testConnServer(connection): count = 0 while 1: s = connection.read() count += len(s) if len(s) == 0: break connection.write(s) if count == 1111: break def serverTestCmd(argv): address = argv[0] dir = argv[1] #Split address into hostname/port tuple address = address.split(":") address = ( address[0], int(address[1]) ) #Create synchronisation FIFO synchroSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) synchroSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) synchroSocket.bind((address[0], address[1]-1)) synchroSocket.listen(2) #Connect to server lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) lsock.bind(address) lsock.listen(5) # following is blocking until the other side doesn't open synchro = synchroSocket.accept()[0] def connect(): s = lsock.accept()[0] s.settimeout(15) return TLSConnection(s) with open(os.path.join(dir, "serverX509Cert.pem")) as f: x509Cert = X509().parse(f.read()) x509Chain = X509CertChain([x509Cert]) with open(os.path.join(dir, "serverX509Key.pem")) as f: x509Key = parsePEMKey(f.read(), private=True) with open(os.path.join(dir, "serverRSAPSSSigCert.pem")) as f: x509CertRSAPSSSig = X509().parse(f.read()) x509ChainRSAPSSSig = X509CertChain([x509CertRSAPSSSig]) with open(os.path.join(dir, "serverRSAPSSSigKey.pem")) as f: x509KeyRSAPSSSig = parsePEMKey(f.read(), private=True) with open(os.path.join(dir, "serverRSAPSSCert.pem")) as f: x509CertRSAPSS = X509().parse(f.read()) x509ChainRSAPSS = X509CertChain([x509CertRSAPSS]) assert x509CertRSAPSS.certAlg == "rsa-pss" with open(os.path.join(dir, "serverRSAPSSKey.pem")) as f: x509KeyRSAPSS = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverECCert.pem")) as f: x509CertECDSA = X509().parse(f.read()) x509ecdsaChain = X509CertChain([x509CertECDSA]) assert x509CertECDSA.certAlg == "ecdsa" with open(os.path.join(dir, "serverECKey.pem")) as f: x509ecdsaKey = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverP384ECCert.pem")) as f: x509CertP384ECDSA = X509().parse(f.read()) x509ecdsaP384Chain = X509CertChain([x509CertP384ECDSA]) assert x509CertP384ECDSA.certAlg == "ecdsa" with open(os.path.join(dir, "serverP384ECKey.pem")) as f: x509ecdsaP384Key = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverP521ECCert.pem")) as f: x509CertP521ECDSA = X509().parse(f.read()) x509ecdsaP521Chain = X509CertChain([x509CertP521ECDSA]) assert x509CertP521ECDSA.certAlg == "ecdsa" with open(os.path.join(dir, "serverP521ECKey.pem")) as f: x509ecdsaP521Key = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverBrainpoolP256r1ECCert.pem")) as f: x509CertBrainpoolP256r1ECDSA = X509().parse(f.read()) x509ecdsaBrainpoolP256r1Chain = X509CertChain([x509CertBrainpoolP256r1ECDSA]) assert x509CertBrainpoolP256r1ECDSA.certAlg == "ecdsa" with open(os.path.join(dir, "serverBrainpoolP256r1ECKey.pem")) as f: x509ecdsaBrainpoolP256r1Key = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverBrainpoolP384r1ECCert.pem")) as f: x509CertBrainpoolP384r1ECDSA = X509().parse(f.read()) x509ecdsaBrainpoolP384r1Chain = X509CertChain([x509CertBrainpoolP384r1ECDSA]) assert x509CertBrainpoolP384r1ECDSA.certAlg == "ecdsa" with open(os.path.join(dir, "serverBrainpoolP384r1ECKey.pem")) as f: x509ecdsaBrainpoolP384r1Key = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverBrainpoolP512r1ECCert.pem")) as f: x509CertBrainpoolP512r1ECDSA = X509().parse(f.read()) x509ecdsaBrainpoolP512r1Chain = X509CertChain([x509CertBrainpoolP512r1ECDSA]) assert x509CertBrainpoolP512r1ECDSA.certAlg == "ecdsa" with open(os.path.join(dir, "serverBrainpoolP512r1ECKey.pem")) as f: x509ecdsaBrainpoolP512r1Key = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverRSANonCACert.pem")) as f: x509CertRSANonCA = X509().parse(f.read()) x509ChainRSANonCA = X509CertChain([x509CertRSANonCA]) assert x509CertRSANonCA.certAlg == "rsa" with open(os.path.join(dir, "serverRSANonCAKey.pem")) as f: x509KeyRSANonCA = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverECDSANonCACert.pem")) as f: x509CertECDSANonCA = X509().parse(f.read()) x509ChainECDSANonCA = X509CertChain([x509CertECDSANonCA]) assert x509CertECDSANonCA.certAlg == "ecdsa" with open(os.path.join(dir, "serverECDSANonCAKey.pem")) as f: x509KeyECDSANonCA = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverDSACert.pem")) as f: x509CertDSA = X509().parse(f.read()) x509ChainDSA = X509CertChain([x509CertDSA]) assert x509CertDSA.certAlg == "dsa" with open(os.path.join(dir, "serverDSAKey.pem")) as f: x509KeyDSA = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverEd25519Cert.pem")) as f: x509CertEd25519 = X509().parse(f.read()) x509Ed25519Chain = X509CertChain([x509CertEd25519]) assert x509CertEd25519.certAlg == "Ed25519" with open(os.path.join(dir, "serverEd25519Key.pem")) as f: x509Ed25519Key = parsePEMKey(f.read(), private=True, implementations=["python"]) with open(os.path.join(dir, "serverEd448Cert.pem")) as f: x509CertEd448 = X509().parse(f.read()) x509Ed448Chain = X509CertChain([x509CertEd448]) assert x509CertEd448.certAlg == "Ed448" with open(os.path.join(dir, "serverEd448Key.pem")) as f: x509Ed448Key = parsePEMKey(f.read(), private=True, implementations=["python"]) test_no = 0 print("Test {0} - Anonymous server handshake".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(anon=True) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 (plus SNI)".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) assert connection.session.serverName == address[0] assert connection.extendedMasterSecret assert connection.session.appProto is None testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 TLSv1.2 (plus ALPN)".format(test_no)) synchro.send(b'R') settings = HandshakeSettings() settings.maxVersion = (3, 3) connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, alpn=[b'http/1.1', b'http/1.0'], settings=settings) assert connection.session.serverName == address[0] assert connection.extendedMasterSecret assert connection.session.appProto == b'http/1.1' testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 TLSv1.3 (plus ALPN)".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, alpn=[b'http/1.1', b'http/1.0']) assert connection.session.serverName == address[0] assert connection.extendedMasterSecret assert connection.session.appProto == b'http/1.1' testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509/w RSA-PSS sig".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509ChainRSAPSSSig, privateKey=x509KeyRSAPSSSig) assert(connection.extendedMasterSecret) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509/w RSA-PSS cert".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509ChainRSAPSS, privateKey=x509KeyRSAPSS) assert(connection.session.serverName == address[0]) assert(connection.extendedMasterSecret) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509/w RSA-PSS cert".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeServer(certChain=x509ChainRSAPSS, privateKey=x509KeyRSAPSS, settings=settings) assert(connection.session.serverName == address[0]) assert(connection.extendedMasterSecret) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509, small record_size_limit".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.record_size_limit = 64 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509, SSLv3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) assert(not connection.extendedMasterSecret) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, SSLv3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 0) settings.maxVersion = (3, 0) connection.handshakeServer(certChain=x509ecdsaChain, privateKey=x509ecdsaKey, settings=settings) assert not connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, TLSv1.0".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 1) settings.maxVersion = (3, 1) connection.handshakeServer(certChain=x509ecdsaChain, privateKey=x509ecdsaKey, settings=settings) assert connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, TLSv1.2".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeServer(certChain=x509ecdsaChain, privateKey=x509ecdsaKey, settings=settings) assert connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 print("Test {0} - mismatched ECDSA curve, TLSv1.2".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) try: connection.handshakeServer(certChain=x509ecdsaChain, privateKey=x509ecdsaKey, settings=settings) assert False except TLSLocalAlert as e: assert "curve in the public key is not supported by the client" in str(e) connection.close() test_no += 1 for curve, certChain, key in (("brainpoolP256r1", x509ecdsaBrainpoolP256r1Chain, x509ecdsaBrainpoolP256r1Key), ("brainpoolP384r1", x509ecdsaBrainpoolP384r1Chain, x509ecdsaBrainpoolP384r1Key), ("brainpoolP512r1", x509ecdsaBrainpoolP512r1Chain, x509ecdsaBrainpoolP512r1Key)): print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2".format(test_no, curve)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) settings.eccCurves = [curve, "secp256r1"] settings.keyShares = [] v_host = VirtualHost() v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)] settings.virtual_hosts = [v_host] connection.handshakeServer(certChain=certChain, privateKey=key, settings=settings) assert connection.extendedMasterSecret assert connection.session.serverCertChain == certChain testConnServer(connection) connection.close() test_no += 1 for curve, exp_chain in (("secp256r1", x509ecdsaChain), ("secp384r1", x509ecdsaP384Chain)): print("Test {0} - Two good ECDSA certs - {1}, TLSv1.2" .format(test_no, curve)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) v_host = VirtualHost() v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)] settings.virtual_hosts = [v_host] connection.handshakeServer(certChain=x509ecdsaP384Chain, privateKey=x509ecdsaP384Key, settings=settings) assert connection.extendedMasterSecret assert connection.session.serverCertChain == exp_chain testConnServer(connection) connection.close() test_no += 1 for tls_ver in ("TLSv1.2", "TLSv1,3"): print("Test {0} - good X509 RSA and ECDSA, correct RSA and ECDSA sigalgs, RSA, {1}" .format(test_no, tls_ver)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 4) v_host = VirtualHost() v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)] settings.virtual_hosts = [v_host] connection.handshakeServer(certChain=x509ChainRSANonCA, privateKey=x509KeyRSANonCA, settings=settings) assert connection.extendedMasterSecret assert connection.session.serverCertChain == x509ChainRSANonCA testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, bad RSA and good ECDSA sigalgs, ECDSA, {1}" .format(test_no, tls_ver)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 4) v_host = VirtualHost() v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)] settings.virtual_hosts = [v_host] connection.handshakeServer(certChain=x509ChainRSANonCA, privateKey=x509KeyRSANonCA, settings=settings) assert connection.extendedMasterSecret assert connection.session.serverCertChain == x509ChainECDSANonCA testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X509 RSA and ECDSA, bad RSA and ECDSA sigalgs, RSA, {1}" .format(test_no, tls_ver)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 4) v_host = VirtualHost() v_host.keys = [Keypair(x509KeyECDSANonCA, x509ChainECDSANonCA.x509List)] settings.virtual_hosts = [v_host] connection.handshakeServer(certChain=x509ChainRSANonCA, privateKey=x509KeyRSANonCA, settings=settings) assert connection.extendedMasterSecret assert connection.session.serverCertChain == x509ChainRSANonCA testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 ECDSA, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509ecdsaChain, privateKey=x509ecdsaKey, settings=settings) assert connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 # check what happens when client doesn't advertise support for signature # algoritm compatible with server key print("Test {0} - mismatched ECDSA curve, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) try: connection.handshakeServer(certChain=x509ecdsaChain, privateKey=x509ecdsaKey, settings=settings) assert False except TLSLocalAlert as e: assert "No common signature algorithms" in str(e) connection.close() test_no += 1 print("Test {0} - good X.509 P-384 ECDSA, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509ecdsaP384Chain, privateKey=x509ecdsaP384Key, settings=settings) assert connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 P-521 ECDSA, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509ecdsaP521Chain, privateKey=x509ecdsaP521Key, settings=settings) assert connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 Ed25519, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Ed25519Chain, privateKey=x509Ed25519Key, settings=settings) assert connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 Ed448, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Ed448Chain, privateKey=x509Ed448Key, settings=settings) assert connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 for prot in ["TLSv1.3", "TLSv1.2"]: for c_type, exp_chain in (("rsa", x509Chain), ("ecdsa", x509ecdsaChain)): print("Test {0} - good RSA and ECDSA, {2}, {1}" .format(test_no, c_type, prot)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 4) v_host = VirtualHost() v_host.keys = [Keypair(x509ecdsaKey, x509ecdsaChain.x509List)] settings.virtual_hosts = [v_host] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) assert connection.extendedMasterSecret assert connection.session.serverCertChain == exp_chain testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509, mismatched key_share".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.eccCurves = ["secp256r1", "secp384r1", "secp521r1"] settings.keyShares = ["secp256r1"] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509, RC4-MD5".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.macNames = ["sha", "md5"] settings.cipherNames = ["rc4"] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() if tackpyLoaded: tack = Tack.createFromPem( open(os.path.join(dir, "TACK1.pem"), "rU").read()) tackUnrelated = Tack.createFromPem( open(os.path.join(dir, "TACKunrelated.pem"), "rU").read()) settings = HandshakeSettings() settings.useExperimentalTackExtension = True test_no += 1 print("Test {0} - good X.509, TACK".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, tacks=[tack], activationFlags=1, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509, TACK unrelated to cert chain".\ format(test_no)) synchro.send(b'R') connection = connect() try: connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, tacks=[tackUnrelated], settings=settings) assert False except TLSRemoteAlert as alert: if alert.description != AlertDescription.illegal_parameter: raise else: test_no += 1 print("Test {0} - good X.509, TACK...skipped (no tackpy)".\ format(test_no)) test_no += 1 print("Test {0} - good X.509, TACK unrelated to cert chain" "...skipped (no tackpy)".format(test_no)) test_no += 1 print("Test {0} - good PSK".format(test_no)) synchro.send(b'R') settings = HandshakeSettings() settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')] connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good PSK, no DH".format(test_no)) synchro.send(b'R') settings = HandshakeSettings() settings.psk_modes = ["psk_ke"] settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')] connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good PSK, no DH, no cert".format(test_no)) synchro.send(b'R') settings = HandshakeSettings() settings.psk_modes = ["psk_ke"] settings.pskConfigs = [(b'test', b'\x00secret', 'sha384')] connection = connect() connection.handshakeServer(settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good SRP (db)".format(test_no)) try: import logging logging.basicConfig(level=logging.DEBUG) (db_file, db_name) = mkstemp() print("server {0} - tmp file created".format(time.time())) os.close(db_file) print("server {0} - tmp file closed".format(time.time())) # this is race'y but the interface dbm interface is stupid like that... os.remove(db_name) print("server {0} - tmp file removed".format(time.time())) verifierDB = VerifierDB(db_name) print("server {0} - verifier initialised".format(time.time())) verifierDB.create() print("server {0} - verifier created".format(time.time())) entry = VerifierDB.makeVerifier("test", "password", 1536) print("server {0} - entry created".format(time.time())) verifierDB[b"test"] = entry print("server {0} - entry added".format(time.time())) synchro.send(b'R') print("server {0} - synchro sent".format(time.time())) connection = connect() connection.handshakeServer(verifierDB=verifierDB) testConnServer(connection) connection.close() finally: try: os.remove(db_name) except FileNotFoundError: # dbm module may create files with different names depending on # platform os.remove(db_name + ".dat") test_no += 1 print("Test {0} - good SRP".format(test_no)) verifierDB = VerifierDB() verifierDB.create() entry = VerifierDB.makeVerifier("test", "password", 1536) verifierDB[b"test"] = entry synchro.send(b'R') connection = connect() connection.handshakeServer(verifierDB=verifierDB) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - SRP faults".format(test_no)) for fault in Fault.clientSrpFaults + Fault.genericFaults: synchro.send(b'R') connection = connect() connection.fault = fault connection.handshakeServer(verifierDB=verifierDB) connection.close() test_no += 1 print("Test {0} - good SRP: with X.509 certificate, TLSv1.0".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(verifierDB=verifierDB, \ certChain=x509Chain, privateKey=x509Key) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - X.509 with SRP faults".format(test_no)) for fault in Fault.clientSrpFaults + Fault.genericFaults: synchro.send(b'R') connection = connect() connection.fault = fault connection.handshakeServer(verifierDB=verifierDB, \ certChain=x509Chain, privateKey=x509Key) connection.close() test_no += 1 print("Test {0} - X.509 faults".format(test_no)) for fault in Fault.clientNoAuthFaults + Fault.genericFaults: synchro.send(b'R') connection = connect() connection.fault = fault connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) connection.close() test_no += 1 print("Test {0} - good mutual X.509".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) testConnServer(connection) assert(isinstance(connection.session.clientCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good mutual ECDSA X.509".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509ecdsaChain, privateKey=x509ecdsaKey, reqCert=True) testConnServer(connection) assert(isinstance(connection.session.clientCertChain, X509CertChain)) assert len(connection.session.clientCertChain.getEndEntityPublicKey()) ==\ 256 connection.close() test_no += 1 print("Test {0} - good mutual Ed25519 X.509".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Ed25519Chain, privateKey=x509Ed25519Key, reqCert=True) testConnServer(connection) assert(isinstance(connection.session.clientCertChain, X509CertChain)) assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good mutual Ed25519 X.509, TLS 1.2".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeServer(certChain=x509Ed25519Chain, privateKey=x509Ed25519Key, reqCert=True, settings=settings) testConnServer(connection) assert(isinstance(connection.session.clientCertChain, X509CertChain)) assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good X.509 DSA, SSLv3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 0) settings.maxVersion = (3, 0) connection.handshakeServer(certChain=x509ChainDSA, privateKey=x509KeyDSA, settings=settings) assert not connection.extendedMasterSecret testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 DSA, TLSv1.2".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeServer(certChain=x509ChainDSA, privateKey=x509KeyDSA, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 Ed25519, TLSv1.2".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeServer(certChain=x509Ed25519Chain, privateKey=x509Ed25519Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good X.509 Ed448, TLSv1.2".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 3) settings.maxVersion = (3, 3) connection.handshakeServer(certChain=x509Ed448Chain, privateKey=x509Ed448Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - good mutual X.509, TLSv1.3 no certs".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3,4) settings.maxVersion = (3,4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) testConnServer(connection) assert not connection.session.clientCertChain connection.close() test_no += 1 print("Test {0} - good mutual X.509, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3,4) settings.maxVersion = (3,4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) testConnServer(connection) assert isinstance(connection.session.clientCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good mutual X.509, PHA, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) assert connection.session.clientCertChain is None for result in connection.request_post_handshake_auth(settings): assert result in (0, 1) synchro.send(b'R') testConnServer(connection) assert connection.session.clientCertChain is not None assert isinstance(connection.session.clientCertChain, X509CertChain) connection.close() test_no += 1 print("Test {0} - good mutual X.509 Ed25519, PHA, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Ed25519Chain, privateKey=x509Ed25519Key, settings=settings) assert connection.session.clientCertChain is None for result in connection.request_post_handshake_auth(settings): assert result in (0, 1) synchro.send(b'R') testConnServer(connection) assert connection.session.clientCertChain is not None assert isinstance(connection.session.clientCertChain, X509CertChain) assert connection.session.clientCertChain.getEndEntityPublicKey().key_type\ == "Ed25519" connection.close() test_no += 1 print("Test {0} - good mutual X.509, PHA and KeyUpdate, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) assert connection.session.clientCertChain is None for result in connection.request_post_handshake_auth(settings): assert result in (0, 1) synchro.send(b'R') assert connection.read(0, 0) == b'' assert connection.session.clientCertChain is not None assert isinstance(connection.session.clientCertChain, X509CertChain) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - mutual X.509, PHA, no client cert, TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) connection.client_cert_required = True assert connection.session.clientCertChain is None for result in connection.request_post_handshake_auth(settings): assert result in (0, 1) synchro.send(b'R') try: testConnServer(connection) assert False except TLSLocalAlert as e: assert "Client did not provide a certificate in post-handshake" in \ str(e) assert e.description == AlertDescription.certificate_required assert connection.session.clientCertChain is None connection.close() test_no += 1 print("Test {0} - good mutual X.509, TLSv1.1".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3,2) settings.maxVersion = (3,2) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) testConnServer(connection) assert(isinstance(connection.session.clientCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - good mutual X.509, SSLv3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3,0) settings.maxVersion = (3,0) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) testConnServer(connection) assert(isinstance(connection.session.clientCertChain, X509CertChain)) connection.close() test_no += 1 print("Test {0} - mutual X.509 faults".format(test_no)) for fault in Fault.clientCertFaults + Fault.genericFaults: synchro.send(b'R') connection = connect() connection.fault = fault connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True) connection.close() test_no += 1 print("Test {0} - good SRP, prepare to resume".format(test_no)) synchro.send(b'R') sessionCache = SessionCache() connection = connect() connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) assert(connection.session.serverName == address[0]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - resumption (plus SNI)".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) assert(connection.session.serverName == address[0]) testConnServer(connection) #Don't close! -- see next test test_no += 1 print("Test {0} - invalidated resumption (plus SNI)".format(test_no)) synchro.send(b'R') try: connection.read(min=1, max=1) assert False #Client is going to close the socket without a close_notify except TLSAbruptCloseError as e: pass synchro.send(b'R') connection = connect() try: connection.handshakeServer(verifierDB=verifierDB, sessionCache=sessionCache) assert False except TLSLocalAlert as alert: if alert.description != AlertDescription.bad_record_mac: raise connection.close() test_no += 1 print("Test {0} - HTTPS test X.509".format(test_no)) #Close the current listening socket lsock.close() #Create and run an HTTP Server using TLSSocketServerMixIn class MyHTTPServer(TLSSocketServerMixIn, HTTPServer): def handshake(self, tlsConnection): tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key) return True def server_bind(self): self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) HTTPServer.server_bind(self) cd = os.getcwd() os.chdir(dir) address = address[0], address[1]+1 httpd = MyHTTPServer(address, SimpleHTTPRequestHandler) for x in range(6): synchro.send(b'R') httpd.handle_request() httpd.server_close() cd = os.chdir(cd) #Re-connect the listening socket lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) lsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) address = address[0], address[1]+1 lsock.bind(address) lsock.listen(5) implementations = [] if m2cryptoLoaded: implementations.append("openssl") if pycryptoLoaded: implementations.append("pycrypto") implementations.append("python") test_no += 1 print("Test {0} - different ciphers, TLSv1.0".format(test_no)) for implementation in ["python"] * len(implementations): for cipher in ["aes128", "aes256", "rc4"]: test_no += 1 print("Test {0}:".format(test_no), end=' ') synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) print(connection.getCipherName(), connection.getCipherImplementation()) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - throughput test".format(test_no)) for implementation in implementations: for cipher in ["aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8", "aes128gcm", "aes256gcm", "aes128", "aes256", "3des", "rc4", "chacha20-poly1305_draft00", "chacha20-poly1305"]: # skip tests with implementations that don't support them if cipher == "3des" and implementation not in ("openssl", "pycrypto"): continue if cipher in ("aes128gcm", "aes256gcm") and \ implementation not in ("pycrypto", "python", "openssl"): continue if cipher in ("aes128ccm", "aes128ccm_8", "aes256ccm", "aes256ccm_8") and \ implementation not in ("python", "openssl"): continue if cipher in ("chacha20-poly1305_draft00", "chacha20-poly1305") \ and implementation not in ("python", ): continue test_no += 1 print("Test {0}:".format(test_no), end=' ') synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.cipherNames = [cipher] settings.cipherImplementations = [implementation, "python"] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) print(connection.getCipherName(), connection.getCipherImplementation()) h = connection.read(min=50000, max=50000) assert(h == b"hello"*10000) connection.write(h) connection.close() test_no += 1 print("Test {0} - Next-Protocol Server Negotiation".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"http/1.1"]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - Next-Protocol Server Negotiation".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - Next-Protocol Server Negotiation".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"http/1.1", b"spdy/2"]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - Next-Protocol Server Negotiation".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"spdy/2", b"http/1.1"]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - Next-Protocol Server Negotiation".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"http/1.1", b"spdy/2", b"spdy/3"]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - Next-Protocol Server Negotiation".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[b"spdy/3", b"spdy/2"]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - Next-Protocol Server Negotiation".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings, nextProtos=[]) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - FALLBACK_SCSV".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) # TODO fix FALLBACK with TLS1.3 connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - FALLBACK_SCSV".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() # TODO fix FALLBACK with TLS1.3 settings.maxVersion = (3, 3) try: connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) assert False except TLSLocalAlert as alert: if alert.description != AlertDescription.inappropriate_fallback: raise connection.close() test_no += 1 print("Test {0} - no EtM server side".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.useEncryptThenMAC = False connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - no EtM client side".format(test_no)) synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - resumption with EtM".format(test_no)) synchro.send(b'R') sessionCache = SessionCache() connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, sessionCache=sessionCache) testConnServer(connection) connection.close() # resume synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, sessionCache=sessionCache) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - resumption with no EtM in 2nd handshake".format(test_no)) synchro.send(b'R') sessionCache = SessionCache() connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, sessionCache=sessionCache) testConnServer(connection) connection.close() # resume synchro.send(b'R') connection = connect() try: connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, sessionCache=sessionCache) assert False except TLSLocalAlert as e: assert(str(e) == "illegal_parameter") else: raise AssertionError("no exception raised") connection.close() test_no += 1 print("Test {0} - resumption in TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3,4) settings.ticketKeys = [getRandomBytes(32)] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() # resume synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - resumption in TLSv1.3 with mutual X.509".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3,4) settings.ticketKeys = [getRandomBytes(32)] connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) testConnServer(connection) connection.close() # resume synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, reqCert=True, settings=settings) testConnServer(connection) assert connection.session.clientCertChain connection.close() test_no += 1 print("Test {0} - resumption in TLSv1.3 with AES-CCM tickets".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.minVersion = (3, 4) settings.ticketKeys = [getRandomBytes(32)] settings.ticketCipher = "aes128ccm" connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() # resume synchro.send(b'R') connection = connect() connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) testConnServer(connection) connection.close() test_no += 1 print("Test {0} - Heartbeat extension response callback in TLSv1.2".format(test_no)) heartbeat_payload = os.urandom(50) def heartbeat_response_check(message): global received_payload received_payload = message.payload synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 3) settings.heartbeat_response_callback = heartbeat_response_check connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) connection.send_heartbeat_request(heartbeat_payload, 16) testConnServer(connection) testConnServer(connection) connection.close() assert heartbeat_payload == received_payload test_no += 1 print("Test {0} - Heartbeat extension in TLSv1.3".format(test_no)) heartbeat_payload = os.urandom(50) def heartbeat_response_check(message): global received_payload received_payload = message.payload synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) settings.heartbeat_response_callback = heartbeat_response_check connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) connection.send_heartbeat_request(heartbeat_payload, 16) testConnServer(connection) testConnServer(connection) connection.close() assert heartbeat_payload == received_payload test_no += 1 print("Test {0} - KeyUpdate from client in TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) synchro.send(b'K') synchro.send(b'K') testConnServer(connection) connection.close() test_no += 1 print("Test {0} - mutual KeyUpdates in TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested): assert i in (0, 1) testConnServer(connection) assert synchro.recv(1) == b'R' connection.close() test_no += 1 print("Test {0} - multiple mutual KeyUpdates in TLSv1.3".format(test_no)) synchro.send(b'R') connection = connect() settings = HandshakeSettings() settings.maxVersion = (3, 4) connection.handshakeServer(certChain=x509Chain, privateKey=x509Key, settings=settings) for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested): assert i in (0, 1) for i in connection.send_keyupdate_request(KeyUpdateMessageType.update_requested): assert i in (0, 1) testConnServer(connection) assert synchro.recv(1) == b'R' connection.close() test_no += 1 print("Tests {0}-{1} - XMLRPXC server".format(test_no, test_no + 2)) address = address[0], address[1]+1 class Server(TLSXMLRPCServer): def handshake(self, tlsConnection): try: tlsConnection.handshakeServer(certChain=x509Chain, privateKey=x509Key, sessionCache=sessionCache) tlsConnection.ignoreAbruptClose = True return True except TLSError as error: print("Handshake failure:", str(error)) return False class MyFuncs: def pow(self, x, y): return pow(x, y) def add(self, x, y): return x + y server = Server(address) server.register_instance(MyFuncs()) synchro.send(b'R') #sa = server.socket.getsockname() #print "Serving HTTPS on", sa[0], "port", sa[1] for i in range(6): synchro.send(b'R') server.handle_request() synchro.close() synchroSocket.close() test_no += 2 print("Test succeeded") if __name__ == '__main__': if len(sys.argv) < 2: printUsage("Missing command") elif sys.argv[1] == "client"[:len(sys.argv[1])]: clientTestCmd(sys.argv[2:]) elif sys.argv[1] == "server"[:len(sys.argv[1])]: serverTestCmd(sys.argv[2:]) else: printUsage("Unknown command: %s" % sys.argv[1])