"""Class for handling primary OCSP responses""" from .utils.asn1parser import ASN1Parser from .utils.cryptomath import bytesToNumber, numBytes, secureHash from .x509 import X509 from .signed import SignedObject from .errors import TLSIllegalParameterException class OCSPRespStatus(object): """ OCSP response status codes (RFC 2560) """ successful = 0 malformedRequest = 1 internalError = 2 tryLater = 3 # 4 is not used to match RFC2560 specification sigRequired = 5 unauthorized = 6 class CertStatus(object): """ Certificate status in an OCSP response """ good, revoked, unknown = range(3) class SingleResponse(object): """ This class represents SingleResponse ASN1 type (defined in RFC2560) """ def __init__(self, value): self.value = value self.cert_hash_alg = None self.cert_issuer_name_hash = None self.cert_issuer_key_hash = None self.cert_serial_num = None self.cert_status = None self.this_update = None self.next_update = None self.parse(value) _hash_algs_OIDs = { tuple([0x2a, 0x86, 0x48, 0x86, 0xf7, 0xd, 0x2, 0x5]): 'md5', tuple([0x2b, 0xe, 0x3, 0x2, 0x1a]): 'sha1', tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x4]): 'sha224', tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x1]): 'sha256', tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x2]): 'sha384', tuple([0x60, 0x86, 0x48, 0x1, 0x65, 0x3, 0x4, 0x2, 0x3]): 'sha512' } def parse(self, value): cert_id = value.getChild(0) self.cert_hash_alg = cert_id.getChild(0).getChild(0).value self.cert_issuer_name_hash = cert_id.getChild(1).value self.cert_issuer_key_hash = cert_id.getChild(2).value self.cert_serial_num = bytesToNumber(cert_id.getChild(3).value) self.cert_status = value.getChild(1).value self.this_update = value.getChild(2).value # next_update is optional try: fld = value.getChild(3) if fld.type.tag_id == 0: self.next_update = fld.value except SyntaxError: self.next_update = None def verify_cert_match(self, server_cert, issuer_cert): # extact subject public key issuer_key = issuer_cert.subject_public_key # extract issuer DN issuer_name = issuer_cert.subject try: alg = self._hash_algs_OIDs[tuple(self.cert_hash_alg)] except KeyError as e: raise TLSIllegalParameterException("Unknown hash algorithm: {0}".format( list(self.cert_hash_alg))) # hash issuer key hashed_key = secureHash(issuer_key, alg) if hashed_key != self.cert_issuer_key_hash: raise ValueError("Could not verify certificate public key") # hash issuer name hashed_name = secureHash(issuer_name, alg) if hashed_name != self.cert_issuer_name_hash: raise ValueError("Could not verify certificate DN") # serial number if server_cert.serial_number != self.cert_serial_num: raise ValueError("Could not verify certificate serial number") return True class OCSPResponse(SignedObject): """ This class represents an OCSP response. """ def __init__(self, value): super(OCSPResponse, self).__init__() self.bytes = None self.resp_status = None self.resp_type = None self.version = None self.resp_id = None self.produced_at = None self.responses = [] self.certs = [] self.parse(value) def parse(self, value): """ Parse a DER-encoded OCSP response. :type value: stream of bytes :param value: An DER-encoded OCSP response """ self.bytes = bytearray(value) parser = ASN1Parser(self.bytes) resp_status = parser.getChild(0) self.resp_status = resp_status.value[0] # if the response status is not successsful, abort parsing other fields if self.resp_status != OCSPRespStatus.successful: return self resp_bytes = parser.getChild(1).getChild(0) self.resp_type = resp_bytes.getChild(0).value response = resp_bytes.getChild(1) # check if response is id-pkix-ocsp-basic if list(self.resp_type) != [43, 6, 1, 5, 5, 7, 48, 1, 1]: raise SyntaxError() basic_resp = response.getChild(0) # parsing tbsResponseData fields self._tbsdataparse(basic_resp.getChild(0)) self.tbs_data = basic_resp.getChildBytes(0) self.signature_alg = basic_resp.getChild(1).getChild(0).value self.signature = basic_resp.getChild(2).value # test if certs field is present if basic_resp.getChildCount() > 3: certs = basic_resp.getChild(3) cnt = certs.getChildCount() for i in range(cnt): certificate = X509() certificate.parseBinary(certs.getChild(i).value) self.certs.append(certificate) return self def _tbsdataparse(self, value): """ Parse to be signed data, :type value: stream of bytes :param value: TBS data """ # test if version is ommited field = value.getChild(0) cnt = 0 if field.type.tag_id == 0: # version is not omitted cnt += 1 self.version = field.value else: self.version = 1 self.resp_id = value.getChild(cnt).value self.produced_at = value.getChild(cnt+1).value responses = value.getChild(cnt+2) resp_cnt = responses.getChildCount() for i in range(resp_cnt): resp = responses.getChild(i) parsed_resp = SingleResponse(resp) self.responses.append(parsed_resp)