def _load_cached_tlds(self):
"""
Loads TLDs from cached file to set.
:return: Set of current TLDs
:rtype: set
"""
list_of_tlds = set()
with open(self._tld_list_path, 'r') as f:
for line in f:
tld = line.strip().lower()
# skip empty lines
if len(tld) <= 0:
continue
# skip comments
if tld[0] == '#':
continue
list_of_tlds.add("." + tld)
list_of_tlds.add("." + idna.decode(tld))
return list_of_tlds
python类decode()的实例源码
def normalize_name(name):
'''
Clean the fully qualified name, as defined in ENS `EIP-137
<https://github.com/ethereum/EIPs/blob/master/EIPS/eip-137.md#name-syntax>`_
This does *not* enforce whether ``name`` is a label or fully qualified domain.
:param str name: the dot-separated ENS name
:raises InvalidName: if ``name`` has invalid syntax
'''
if not name:
return name
elif isinstance(name, (bytes, bytearray)):
name = name.decode('utf-8')
try:
return idna.decode(name, uts46=True, std3_rules=True)
except idna.IDNAError as exc:
raise InvalidName("%s is an invalid name, because %s" % (name, exc)) from exc
def _idnaText(octets):
"""
Convert some IDNA-encoded octets into some human-readable text.
Currently only used by the tests.
@param octets: Some bytes representing a hostname.
@type octets: L{bytes}
@return: A human-readable domain name.
@rtype: L{unicode}
"""
try:
import idna
except ImportError:
return octets.decode("idna")
else:
return idna.decode(octets)
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
#
# But OIDs longer than this occur in real life (e.g. Active
# Directory makes some very long OIDs). So we need to detect
# and properly handle the case where the default buffer is not
# big enough.
#
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
# 'res' is the number of bytes that *would* be written if the
# buffer is large enough. If 'res' > buf_len - 1, we need to
# alloc a big-enough buffer and go again.
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
if res > buf_len - 1: # account for terminating null byte
buf_len = res + 1
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _decode_certificate_policies(backend, cp):
cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
cp = backend._ffi.gc(cp, backend._lib.sk_POLICYINFO_free)
num = backend._lib.sk_POLICYINFO_num(cp)
certificate_policies = []
for i in range(num):
qualifiers = None
pi = backend._lib.sk_POLICYINFO_value(cp, i)
oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
if pi.qualifiers != backend._ffi.NULL:
qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
qualifiers = []
for j in range(qnum):
pqi = backend._lib.sk_POLICYQUALINFO_value(
pi.qualifiers, j
)
pqualid = x509.ObjectIdentifier(
_obj2txt(backend, pqi.pqualid)
)
if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
cpsuri = backend._ffi.buffer(
pqi.d.cpsuri.data, pqi.d.cpsuri.length
)[:].decode('ascii')
qualifiers.append(cpsuri)
else:
assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
user_notice = _decode_user_notice(
backend, pqi.d.usernotice
)
qualifiers.append(user_notice)
certificate_policies.append(
x509.PolicyInformation(oid, qualifiers)
)
return x509.CertificatePolicies(certificate_policies)
def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
def _asn1_string_to_utf8(backend, asn1_string):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
"Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
def _asn1_string_to_utf8(backend, asn1_string):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
"Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
def _lift_parse_result(parse_rs):
if isinstance(parse_rs, Mailbox):
try:
return EmailAddress(
display_name=smart_unquote(parse_rs.display_name.decode('utf-8')),
mailbox=parse_rs.local_part.decode('utf-8'),
hostname=parse_rs.domain.decode('utf-8'))
except (UnicodeError, IDNAError):
return None
if isinstance(parse_rs, Url):
return UrlAddress(address=parse_rs.address.decode('utf-8'))
return None
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _decode_certificate_policies(backend, cp):
cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
cp = backend._ffi.gc(cp, backend._lib.sk_POLICYINFO_free)
num = backend._lib.sk_POLICYINFO_num(cp)
certificate_policies = []
for i in range(num):
qualifiers = None
pi = backend._lib.sk_POLICYINFO_value(cp, i)
oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
if pi.qualifiers != backend._ffi.NULL:
qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
qualifiers = []
for j in range(qnum):
pqi = backend._lib.sk_POLICYQUALINFO_value(
pi.qualifiers, j
)
pqualid = x509.ObjectIdentifier(
_obj2txt(backend, pqi.pqualid)
)
if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
cpsuri = backend._ffi.buffer(
pqi.d.cpsuri.data, pqi.d.cpsuri.length
)[:].decode('ascii')
qualifiers.append(cpsuri)
else:
assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
user_notice = _decode_user_notice(
backend, pqi.d.usernotice
)
qualifiers.append(user_notice)
certificate_policies.append(
x509.PolicyInformation(oid, qualifiers)
)
return x509.CertificatePolicies(certificate_policies)
def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
def _asn1_string_to_utf8(backend, asn1_string):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
"Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _decode_certificate_policies(backend, cp):
cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
cp = backend._ffi.gc(cp, backend._lib.sk_POLICYINFO_free)
num = backend._lib.sk_POLICYINFO_num(cp)
certificate_policies = []
for i in range(num):
qualifiers = None
pi = backend._lib.sk_POLICYINFO_value(cp, i)
oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
if pi.qualifiers != backend._ffi.NULL:
qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
qualifiers = []
for j in range(qnum):
pqi = backend._lib.sk_POLICYQUALINFO_value(
pi.qualifiers, j
)
pqualid = x509.ObjectIdentifier(
_obj2txt(backend, pqi.pqualid)
)
if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
cpsuri = backend._ffi.buffer(
pqi.d.cpsuri.data, pqi.d.cpsuri.length
)[:].decode('ascii')
qualifiers.append(cpsuri)
else:
assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
user_notice = _decode_user_notice(
backend, pqi.d.usernotice
)
qualifiers.append(user_notice)
certificate_policies.append(
x509.PolicyInformation(oid, qualifiers)
)
return x509.CertificatePolicies(certificate_policies)
def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
def _asn1_string_to_utf8(backend, asn1_string):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
"Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
def _asn1_string_to_utf8(backend, asn1_string):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
"Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
def _download_tlds_list(self):
"""
Function downloads list of TLDs from IANA 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt'.
:return: True if list was downloaded, False in case of an error
:rtype: bool
"""
url_list = 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt'
# check if we can write cache file
if os.access(self._tld_list_path, os.F_OK) and not os.access(self._tld_list_path, os.W_OK):
print("ERROR: Cache file is not writable for current user. ({})".format(self._tld_list_path))
return False
req = urllib.request.Request(url_list)
req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.0; WOW64; rv:24.0) Gecko/20100101 Firefox/24.0')
with open(self._tld_list_path, 'w') as ftld:
try:
with urllib.request.urlopen(req) as f:
page = f.read().decode('utf-8')
ftld.write(page)
except HTTPError as e:
print("ERROR: Can not download list ot TLDs. (HTTPError: {})".format(e.reason))
return False
except URLError as e:
print("ERROR: Can not download list ot TLDs. (URLError: {})".format(e.reason))
return False
return True
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _decode_certificate_policies(backend, cp):
cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp)
cp = backend._ffi.gc(cp, backend._lib.sk_POLICYINFO_free)
num = backend._lib.sk_POLICYINFO_num(cp)
certificate_policies = []
for i in range(num):
qualifiers = None
pi = backend._lib.sk_POLICYINFO_value(cp, i)
oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid))
if pi.qualifiers != backend._ffi.NULL:
qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers)
qualifiers = []
for j in range(qnum):
pqi = backend._lib.sk_POLICYQUALINFO_value(
pi.qualifiers, j
)
pqualid = x509.ObjectIdentifier(
_obj2txt(backend, pqi.pqualid)
)
if pqualid == CertificatePoliciesOID.CPS_QUALIFIER:
cpsuri = backend._ffi.buffer(
pqi.d.cpsuri.data, pqi.d.cpsuri.length
)[:].decode('ascii')
qualifiers.append(cpsuri)
else:
assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE
user_notice = _decode_user_notice(
backend, pqi.d.usernotice
)
qualifiers.append(user_notice)
certificate_policies.append(
x509.PolicyInformation(oid, qualifiers)
)
return x509.CertificatePolicies(certificate_policies)
def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")
def _asn1_string_to_utf8(backend, asn1_string):
buf = backend._ffi.new("unsigned char **")
res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string)
if res == -1:
raise ValueError(
"Unsupported ASN1 string type. Type: {0}".format(asn1_string.type)
)
backend.openssl_assert(buf[0] != backend._ffi.NULL)
buf = backend._ffi.gc(
buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0])
)
return backend._ffi.buffer(buf[0], res)[:].decode('utf8')
def _obj2txt(backend, obj):
# Set to 80 on the recommendation of
# https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values
buf_len = 80
buf = backend._ffi.new("char[]", buf_len)
res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1)
backend.openssl_assert(res > 0)
return backend._ffi.buffer(buf, res)[:].decode()
def _asn1_string_to_ascii(backend, asn1_string):
return _asn1_string_to_bytes(backend, asn1_string).decode("ascii")