def fetch_inventory(app, uri, inv):
"""Fetch, parse and return an intersphinx inventory file."""
# both *uri* (base URI of the links to generate) and *inv* (actual
# location of the inventory file) can be local or remote URIs
localuri = uri.find('://') == -1
join = localuri and path.join or posixpath.join
try:
if inv.find('://') != -1:
f = request.urlopen(inv)
else:
f = open(path.join(app.srcdir, inv), 'rb')
except Exception as err:
app.warn('intersphinx inventory %r not fetchable due to '
'%s: %s' % (inv, err.__class__, err))
return
try:
line = f.readline().rstrip().decode('utf-8')
try:
if line == '# Sphinx inventory version 1':
invdata = read_inventory_v1(f, uri, join)
elif line == '# Sphinx inventory version 2':
invdata = read_inventory_v2(f, uri, join)
else:
raise ValueError
f.close()
except ValueError:
f.close()
raise ValueError('unknown or unsupported inventory version')
except Exception as err:
app.warn('intersphinx inventory %r not readable due to '
'%s: %s' % (inv, err.__class__.__name__, err))
else:
return invdata
python类urlopen()的实例源码
def urlretrieve(url, filename, reporthook=None, data=None):
"""Replacement for `urlretrive` for Python 2.
Under Python 2, `urlretrieve` relies on `FancyURLopener` from legacy
`urllib` module, known to have issues with proxy management.
Arguments:
url: url to retrieve.
filename: where to store the retrieved data locally.
reporthook: a hook function that will be called once
on establishment of the network connection and once
after each block read thereafter.
The hook will be passed three arguments;
a count of blocks transferred so far,
a block size in bytes, and the total size of the file.
data: `data` argument passed to `urlopen`.
"""
def chunk_read(response, chunk_size=8192, reporthook=None):
content_type = response.info().get('Content-Length')
total_size = -1
if content_type is not None:
total_size = int(content_type.strip())
count = 0
while 1:
chunk = response.read(chunk_size)
count += 1
if not chunk:
reporthook(count, total_size, total_size)
break
if reporthook:
reporthook(count, chunk_size, total_size)
yield chunk
response = urlopen(url, data)
with open(filename, 'wb') as fd:
for chunk in chunk_read(response, reporthook=reporthook):
fd.write(chunk)
def test_rfc6979(self):
text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
curve_tests = findall(r'curve: NIST P-192(.*)curve: NIST P-224', text, flags=DOTALL)[0]
q = int(findall(r'q = ([0-9A-F]*)', curve_tests)[0], 16)
x = int(findall(r'x = ([0-9A-F]*)', curve_tests)[0], 16)
test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
r'\s*k = ([0-9A-F]*)\n' \
r'\s*r = ([0-9A-F]*)\n' \
r'\s*s = ([0-9A-F]*)\n'
hash_lookup = {
'1': sha1,
'224': sha224,
'256': sha256,
'384': sha384,
'512': sha512
}
for test in findall(test_regex, curve_tests):
h = hash_lookup[test[0]]
msg = test[1]
k = int(test[2], 16)
r = int(test[3], 16)
s = int(test[4], 16)
self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
self.assertEqual((r, s), sign(msg, x, curve=P192, hashfunc=h))
def test_rfc6979(self):
text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
curve_tests = findall(r'curve: NIST P-224(.*)curve: NIST P-256', text, flags=DOTALL)[0]
q = int(findall(r'q = ([0-9A-F]*)', curve_tests)[0], 16)
x = int(findall(r'x = ([0-9A-F]*)', curve_tests)[0], 16)
test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
r'\s*k = ([0-9A-F]*)\n' \
r'\s*r = ([0-9A-F]*)\n' \
r'\s*s = ([0-9A-F]*)\n'
hash_lookup = {
'1': sha1,
'224': sha224,
'256': sha256,
'384': sha384,
'512': sha512
}
for test in findall(test_regex, curve_tests):
h = hash_lookup[test[0]]
msg = test[1]
k = int(test[2], 16)
r = int(test[3], 16)
s = int(test[4], 16)
self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
self.assertEqual((r, s), sign(msg, x, curve=P224, hashfunc=h))
def test_rfc6979(self):
text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
curve_tests = findall(r'curve: NIST P-256(.*)curve: NIST P-384', text, flags=DOTALL)[0]
q = int(findall(r'q = ([0-9A-F]*)', curve_tests)[0], 16)
x = int(findall(r'x = ([0-9A-F]*)', curve_tests)[0], 16)
test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
r'\s*k = ([0-9A-F]*)\n' \
r'\s*r = ([0-9A-F]*)\n' \
r'\s*s = ([0-9A-F]*)\n'
hash_lookup = {
'1': sha1,
'224': sha224,
'256': sha256,
'384': sha384,
'512': sha512
}
for test in findall(test_regex, curve_tests):
h = hash_lookup[test[0]]
msg = test[1]
k = int(test[2], 16)
r = int(test[3], 16)
s = int(test[4], 16)
self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
self.assertEqual((r, s), sign(msg, x, curve=P256, hashfunc=h))
def test_rfc6979(self):
text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
curve_tests = findall(r'curve: NIST P-384(.*)curve: NIST P-521', text, flags=DOTALL)[0]
q_parts = findall(r'q = ([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
q = int(q_parts[0] + q_parts[1], 16)
x_parts = findall(r'x = ([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
x = int(x_parts[0] + x_parts[1], 16)
test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
r'\s*k = ([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
r'\s*r = ([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
r'\s*s = ([0-9A-F]*)\n\s*([0-9A-F]*)\n'
hash_lookup = {
'1': sha1,
'224': sha224,
'256': sha256,
'384': sha384,
'512': sha512
}
for test in findall(test_regex, curve_tests):
h = hash_lookup[test[0]]
msg = test[1]
k = int(test[2] + test[3], 16)
r = int(test[4] + test[5], 16)
s = int(test[6] + test[7], 16)
self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
self.assertEqual((r, s), sign(msg, x, curve=P384, hashfunc=h))
def test_rfc6979(self):
text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
curve_tests = findall(r'curve: NIST P-521(.*)curve: NIST K-163', text, flags=DOTALL)[0]
q_parts = findall(r'q = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
q = int(q_parts[0] + q_parts[1] + q_parts[2], 16)
x_parts = findall(r'x = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
x = int(x_parts[0] + x_parts[1] + x_parts[2], 16)
test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
r'\s*k = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
r'\s*r = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
r'\s*s = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)\n'
hash_lookup = {
'1': sha1,
'224': sha224,
'256': sha256,
'384': sha384,
'512': sha512
}
for test in findall(test_regex, curve_tests):
h = hash_lookup[test[0]]
msg = test[1]
k = int(test[2] + test[3] + test[4], 16)
r = int(test[5] + test[6] + test[7], 16)
s = int(test[8] + test[9] + test[10], 16)
self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
self.assertEqual((r, s), sign(msg, x, curve=P521, hashfunc=h))
def get_demand_price(aws_region, instance_type):
"""Get AWS instance demand price.
>>> print(get_demand_price('us-east-1', 'm4.2xlarge'))
"""
soup = BeautifulSoup(urlopen(EC2_INSTANCES_INFO_URL), 'html.parser')
table = soup.find('table', {'id': 'data'})
row = table.find(id=instance_type)
td = row.find('td', {'class': 'cost-ondemand-linux'})
region_prices = json.loads(td['data-pricing'])
return float(region_prices[aws_region])
def do_stack_adopt(hc, args):
'''Adopt a stack.'''
env_files, env = template_utils.process_multiple_environments_and_files(
env_paths=args.environment_file)
if not args.adopt_file:
raise exc.CommandError(_('Need to specify %(arg)s') %
{'arg': '--adopt-file'})
adopt_url = utils.normalise_file_path_to_url(args.adopt_file)
adopt_data = request.urlopen(adopt_url).read()
if not len(adopt_data):
raise exc.CommandError('Invalid adopt-file, no data!')
if args.create_timeout:
logger.warning(_LW('%(arg1)s is deprecated, '
'please use %(arg2)s instead'),
{
'arg1': '-c/--create-timeout',
'arg2': '-t/--timeout'})
fields = {
'stack_name': args.name,
'disable_rollback': not(args.enable_rollback),
'adopt_stack_data': adopt_data,
'parameters': utils.format_parameters(args.parameters),
'files': dict(list(env_files.items())),
'environment': env
}
timeout = args.timeout or args.create_timeout
if timeout:
fields['timeout_mins'] = timeout
hc.stacks.create(**fields)
do_stack_list(hc)
def do_resource_signal(hc, args):
'''Send a signal to a resource.'''
fields = {'stack_id': args.id,
'resource_name': args.resource}
data = args.data
data_file = args.data_file
if data and data_file:
raise exc.CommandError(_('Can only specify one of data and data-file'))
if data_file:
data_url = utils.normalise_file_path_to_url(data_file)
data = request.urlopen(data_url).read()
if data:
if isinstance(data, six.binary_type):
data = data.decode('utf-8')
try:
data = jsonutils.loads(data)
except ValueError as ex:
raise exc.CommandError(_('Data should be in JSON format: %s') % ex)
if not isinstance(data, dict):
raise exc.CommandError(_('Data should be a JSON dict'))
fields['data'] = data
try:
hc.resources.signal(**fields)
except exc.HTTPNotFound:
raise exc.CommandError(_('Stack or resource not found: '
'%(id)s %(resource)s') %
{'id': args.id, 'resource': args.resource})
def do_config_create(hc, args):
'''Create a software configuration.'''
config = {
'group': args.group,
'config': ''
}
defn = {}
if args.definition_file:
defn_url = utils.normalise_file_path_to_url(
args.definition_file)
defn_raw = request.urlopen(defn_url).read() or '{}'
defn = yaml.load(defn_raw, Loader=template_format.yaml_loader)
config['inputs'] = defn.get('inputs', [])
config['outputs'] = defn.get('outputs', [])
config['options'] = defn.get('options', {})
if args.config_file:
config_url = utils.normalise_file_path_to_url(
args.config_file)
config['config'] = request.urlopen(config_url).read()
# build a mini-template with a config resource and validate it
validate_template = {
'heat_template_version': '2013-05-23',
'resources': {
args.name: {
'type': 'OS::Heat::SoftwareConfig',
'properties': config
}
}
}
hc.stacks.validate(template=validate_template)
config['name'] = args.name
sc = hc.software_configs.create(**config)
print(jsonutils.dumps(sc.to_dict(), indent=2))
def _create_config(heat_client, args):
config = {
'group': args.group,
'config': ''
}
defn = {}
if args.definition_file:
defn_url = heat_utils.normalise_file_path_to_url(
args.definition_file)
defn_raw = request.urlopen(defn_url).read() or '{}'
defn = yaml.load(defn_raw, Loader=template_format.yaml_loader)
config['inputs'] = defn.get('inputs', [])
config['outputs'] = defn.get('outputs', [])
config['options'] = defn.get('options', {})
if args.config_file:
config_url = heat_utils.normalise_file_path_to_url(
args.config_file)
config['config'] = request.urlopen(config_url).read()
# build a mini-template with a config resource and validate it
validate_template = {
'heat_template_version': '2013-05-23',
'resources': {
args.name: {
'type': 'OS::Heat::SoftwareConfig',
'properties': config
}
}
}
heat_client.stacks.validate(template=validate_template)
config['name'] = args.name
sc = heat_client.software_configs.create(**config).to_dict()
rows = list(six.itervalues(sc))
columns = list(six.iterkeys(sc))
return columns, rows
def _resource_signal(heat_client, args):
fields = {'stack_id': args.stack,
'resource_name': args.resource}
data = args.data
data_file = args.data_file
if data and data_file:
raise exc.CommandError(_('Should only specify one of data or '
'data-file'))
if data_file:
data_url = heat_utils.normalise_file_path_to_url(data_file)
data = request.urlopen(data_url).read()
if data:
try:
data = jsonutils.loads(data)
except ValueError as ex:
raise exc.CommandError(_('Data should be in JSON format: %s') % ex)
if not isinstance(data, dict):
raise exc.CommandError(_('Data should be a JSON dict'))
fields['data'] = data
try:
heat_client.resources.signal(**fields)
except heat_exc.HTTPNotFound:
raise exc.CommandError(_('Stack %(stack)s or resource %(resource)s '
'not found.') %
{'stack': args.stack,
'resource': args.resource})
def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
client = self.app.client_manager.orchestration
env_files, env = (
template_utils.process_multiple_environments_and_files(
env_paths=parsed_args.environment))
adopt_url = heat_utils.normalise_file_path_to_url(
parsed_args.adopt_file)
adopt_data = request.urlopen(adopt_url).read().decode('utf-8')
fields = {
'stack_name': parsed_args.name,
'disable_rollback': not parsed_args.enable_rollback,
'adopt_stack_data': adopt_data,
'parameters': heat_utils.format_parameters(parsed_args.parameter),
'files': dict(list(env_files.items())),
'environment': env,
'timeout': parsed_args.timeout
}
stack = client.stacks.create(**fields)['stack']
if parsed_args.wait:
stack_status, msg = event_utils.poll_for_events(
client, parsed_args.name, action='ADOPT')
if stack_status == 'ADOPT_FAILED':
raise exc.CommandError(msg)
return _show_stack(client, stack['id'], format='table', short=True)
def resolve(self, uri):
if uri.startswith('s3://'):
contents = self.get_s3_uri(uri)
else:
# TODO: in the case of file: content and untrusted
# third parties, uri would need sanitization
fh = urlopen(uri)
contents = fh.read().decode('utf-8')
fh.close()
self.cache.save(("uri-resolver", uri), contents)
return contents
def _get_json_events_from_wikipedia(month, date):
url = "{}{}_{}".format(URL_PREFIX, month, date)
data = urlopen(url).read().decode('utf-8')
return _parse_json(data)
def load_certificate(cert_url):
if not _valid_certificate_url(cert_url):
raise VerificationError("Certificate URL verification failed")
cert_data = urlopen(cert_url).read()
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data)
if not _valid_certificate(cert):
raise VerificationError("Certificate verification failed")
return cert
def get(url):
try:
return json.loads(urlopen(url).read())
except ValueError as e:
stop_err(str(e))
def post(api_key, url, data):
url = make_url(api_key, url)
response = Request(url, headers={'Content-Type': 'application/json'}, data=json.dumps(data))
return json.loads(urlopen(response).read())
def example_data():
if not os.path.isfile("cwl-example-data/chr22_cwl_test.cram"):
from six.moves.urllib.request import urlopen
import tarfile
print("Downloading and extracting cwl-example-data")
tgz = urlopen("https://cwl-example-data.cog.sanger.ac.uk/chr22_cwl_test.tgz")
tar = tarfile.open(fileobj=tgz, mode="r|gz")
tar.extractall(path="cwl-example-data")
tar.close()
tgz.close()