def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
return self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
python类UnrecognizedExtension()的实例源码
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
return self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._create_raw_x509_extension(extension, value)
elif isinstance(extension.value, x509.TLSFeature):
asn1 = _Integers([x.value for x in extension.value]).dump()
value = _encode_asn1_str_gc(self, asn1, len(asn1))
return self._create_raw_x509_extension(extension, value)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
return self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
# This OID is only supported in OpenSSL 1.1.0+ but we want
# to support it in all versions of OpenSSL so we decode it
# ourselves.
if oid == ExtensionOID.TLS_FEATURE:
data = backend._lib.X509_EXTENSION_get_data(ext)
parsed = _Integers.load(_asn1_string_to_bytes(backend, data))
value = x509.TLSFeature(
[_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed]
)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
continue
try:
handler = self.handlers[oid]
except KeyError:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)
def _create_x509_extension(self, handlers, extension):
if isinstance(extension.value, x509.UnrecognizedExtension):
obj = _txt2obj_gc(self, extension.oid.dotted_string)
value = _encode_asn1_str_gc(
self, extension.value.value, len(extension.value.value)
)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
value
)
else:
try:
encode = handlers[extension.oid]
except KeyError:
raise NotImplementedError(
'Extension not supported: {0}'.format(extension.oid)
)
ext_struct = encode(self, extension.value)
nid = self._lib.OBJ_txt2nid(
extension.oid.dotted_string.encode("ascii")
)
backend.openssl_assert(nid != self._lib.NID_undef)
x509_extension = self._lib.X509V3_EXT_i2d(
nid, 1 if extension.critical else 0, ext_struct
)
if (
x509_extension == self._ffi.NULL and
extension.oid == x509.OID_CERTIFICATE_ISSUER
):
# This path exists to support OpenSSL 0.9.8, which does not
# know how to encode a CERTIFICATE_ISSUER for CRLs. Once we
# drop 0.9.8 support we can remove this.
self._consume_errors()
pp = backend._ffi.new("unsigned char **")
r = self._lib.i2d_GENERAL_NAMES(ext_struct, pp)
backend.openssl_assert(r > 0)
pp = backend._ffi.gc(
pp,
lambda pointer: backend._lib.OPENSSL_free(pointer[0])
)
obj = _txt2obj_gc(self, extension.oid.dotted_string)
return self._lib.X509_EXTENSION_create_by_OBJ(
self._ffi.NULL,
obj,
1 if extension.critical else 0,
_encode_asn1_str_gc(self, pp[0], r)
)
return x509_extension
def parse(self, backend, x509_obj):
extensions = []
seen_oids = set()
for i in range(self.ext_count(backend, x509_obj)):
ext = self.get_ext(backend, x509_obj, i)
backend.openssl_assert(ext != backend._ffi.NULL)
crit = backend._lib.X509_EXTENSION_get_critical(ext)
critical = crit == 1
oid = x509.ObjectIdentifier(
_obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext))
)
if oid in seen_oids:
raise x509.DuplicateExtension(
"Duplicate {0} extension found".format(oid), oid
)
try:
handler = self.handlers[oid]
except KeyError:
if critical:
raise x509.UnsupportedExtension(
"Critical extension {0} is not currently supported"
.format(oid), oid
)
else:
# Dump the DER payload into an UnrecognizedExtension object
data = backend._lib.X509_EXTENSION_get_data(ext)
backend.openssl_assert(data != backend._ffi.NULL)
der = backend._ffi.buffer(data.data, data.length)[:]
unrecognized = x509.UnrecognizedExtension(oid, der)
extensions.append(
x509.Extension(oid, critical, unrecognized)
)
else:
# For extensions which are not supported by OpenSSL we pass the
# extension object directly to the parsing routine so it can
# be decoded manually.
if self.unsupported_exts and oid in self.unsupported_exts:
ext_data = ext
else:
ext_data = backend._lib.X509V3_EXT_d2i(ext)
if ext_data == backend._ffi.NULL:
backend._consume_errors()
raise ValueError(
"The {0} extension is invalid and can't be "
"parsed".format(oid)
)
value = handler(backend, ext_data)
extensions.append(x509.Extension(oid, critical, value))
seen_oids.add(oid)
return x509.Extensions(extensions)