python类urlopen()的实例源码

intersphinx.py 文件源码 项目:chalktalk_docs 作者: loremIpsum1771 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def fetch_inventory(app, uri, inv):
    """Fetch, parse and return an intersphinx inventory file."""
    # both *uri* (base URI of the links to generate) and *inv* (actual
    # location of the inventory file) can be local or remote URIs
    localuri = uri.find('://') == -1
    join = localuri and path.join or posixpath.join
    try:
        if inv.find('://') != -1:
            f = request.urlopen(inv)
        else:
            f = open(path.join(app.srcdir, inv), 'rb')
    except Exception as err:
        app.warn('intersphinx inventory %r not fetchable due to '
                 '%s: %s' % (inv, err.__class__, err))
        return
    try:
        line = f.readline().rstrip().decode('utf-8')
        try:
            if line == '# Sphinx inventory version 1':
                invdata = read_inventory_v1(f, uri, join)
            elif line == '# Sphinx inventory version 2':
                invdata = read_inventory_v2(f, uri, join)
            else:
                raise ValueError
            f.close()
        except ValueError:
            f.close()
            raise ValueError('unknown or unsupported inventory version')
    except Exception as err:
        app.warn('intersphinx inventory %r not readable due to '
                 '%s: %s' % (inv, err.__class__.__name__, err))
    else:
        return invdata
tf-keras-skeleton.py 文件源码 项目:LIE 作者: EmbraceLife 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def urlretrieve(url, filename, reporthook=None, data=None):
            """Replacement for `urlretrive` for Python 2.

            Under Python 2, `urlretrieve` relies on `FancyURLopener` from legacy
            `urllib` module, known to have issues with proxy management.

            Arguments:
                url: url to retrieve.
                filename: where to store the retrieved data locally.
                reporthook: a hook function that will be called once
                    on establishment of the network connection and once
                    after each block read thereafter.
                    The hook will be passed three arguments;
                    a count of blocks transferred so far,
                    a block size in bytes, and the total size of the file.
                data: `data` argument passed to `urlopen`.
            """

            def chunk_read(response, chunk_size=8192, reporthook=None):
              content_type = response.info().get('Content-Length')
              total_size = -1
              if content_type is not None:
                total_size = int(content_type.strip())
              count = 0
              while 1:
                chunk = response.read(chunk_size)
                count += 1
                if not chunk:
                  reporthook(count, total_size, total_size)
                  break
                if reporthook:
                  reporthook(count, chunk_size, total_size)
                yield chunk

            response = urlopen(url, data)
            with open(filename, 'wb') as fd:
              for chunk in chunk_read(response, reporthook=reporthook):
                fd.write(chunk)
test.py 文件源码 项目:fastecdsa 作者: AntonKueltz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_rfc6979(self):
        text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
        curve_tests = findall(r'curve: NIST P-192(.*)curve: NIST P-224', text, flags=DOTALL)[0]

        q = int(findall(r'q = ([0-9A-F]*)', curve_tests)[0], 16)
        x = int(findall(r'x = ([0-9A-F]*)', curve_tests)[0], 16)

        test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
                     r'\s*k = ([0-9A-F]*)\n' \
                     r'\s*r = ([0-9A-F]*)\n' \
                     r'\s*s = ([0-9A-F]*)\n'

        hash_lookup = {
            '1': sha1,
            '224': sha224,
            '256': sha256,
            '384': sha384,
            '512': sha512
        }

        for test in findall(test_regex, curve_tests):
            h = hash_lookup[test[0]]
            msg = test[1]
            k = int(test[2], 16)
            r = int(test[3], 16)
            s = int(test[4], 16)

            self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
            self.assertEqual((r, s), sign(msg, x, curve=P192, hashfunc=h))
test.py 文件源码 项目:fastecdsa 作者: AntonKueltz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_rfc6979(self):
        text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
        curve_tests = findall(r'curve: NIST P-224(.*)curve: NIST P-256', text, flags=DOTALL)[0]

        q = int(findall(r'q = ([0-9A-F]*)', curve_tests)[0], 16)
        x = int(findall(r'x = ([0-9A-F]*)', curve_tests)[0], 16)

        test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
                     r'\s*k = ([0-9A-F]*)\n' \
                     r'\s*r = ([0-9A-F]*)\n' \
                     r'\s*s = ([0-9A-F]*)\n'

        hash_lookup = {
            '1': sha1,
            '224': sha224,
            '256': sha256,
            '384': sha384,
            '512': sha512
        }

        for test in findall(test_regex, curve_tests):
            h = hash_lookup[test[0]]
            msg = test[1]
            k = int(test[2], 16)
            r = int(test[3], 16)
            s = int(test[4], 16)

            self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
            self.assertEqual((r, s), sign(msg, x, curve=P224, hashfunc=h))
test.py 文件源码 项目:fastecdsa 作者: AntonKueltz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_rfc6979(self):
        text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
        curve_tests = findall(r'curve: NIST P-256(.*)curve: NIST P-384', text, flags=DOTALL)[0]

        q = int(findall(r'q = ([0-9A-F]*)', curve_tests)[0], 16)
        x = int(findall(r'x = ([0-9A-F]*)', curve_tests)[0], 16)

        test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
                     r'\s*k = ([0-9A-F]*)\n' \
                     r'\s*r = ([0-9A-F]*)\n' \
                     r'\s*s = ([0-9A-F]*)\n'

        hash_lookup = {
            '1': sha1,
            '224': sha224,
            '256': sha256,
            '384': sha384,
            '512': sha512
        }

        for test in findall(test_regex, curve_tests):
            h = hash_lookup[test[0]]
            msg = test[1]
            k = int(test[2], 16)
            r = int(test[3], 16)
            s = int(test[4], 16)

            self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
            self.assertEqual((r, s), sign(msg, x, curve=P256, hashfunc=h))
test.py 文件源码 项目:fastecdsa 作者: AntonKueltz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_rfc6979(self):
        text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
        curve_tests = findall(r'curve: NIST P-384(.*)curve: NIST P-521', text, flags=DOTALL)[0]

        q_parts = findall(r'q = ([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
        q = int(q_parts[0] + q_parts[1], 16)
        x_parts = findall(r'x = ([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
        x = int(x_parts[0] + x_parts[1], 16)

        test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
                     r'\s*k = ([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
                     r'\s*r = ([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
                     r'\s*s = ([0-9A-F]*)\n\s*([0-9A-F]*)\n'

        hash_lookup = {
            '1': sha1,
            '224': sha224,
            '256': sha256,
            '384': sha384,
            '512': sha512
        }

        for test in findall(test_regex, curve_tests):
            h = hash_lookup[test[0]]
            msg = test[1]
            k = int(test[2] + test[3], 16)
            r = int(test[4] + test[5], 16)
            s = int(test[6] + test[7], 16)

            self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
            self.assertEqual((r, s), sign(msg, x, curve=P384, hashfunc=h))
test.py 文件源码 项目:fastecdsa 作者: AntonKueltz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_rfc6979(self):
        text = urlopen('https://tools.ietf.org/rfc/rfc6979.txt').read().decode()
        curve_tests = findall(r'curve: NIST P-521(.*)curve: NIST K-163', text, flags=DOTALL)[0]

        q_parts = findall(r'q = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
        q = int(q_parts[0] + q_parts[1] + q_parts[2], 16)
        x_parts = findall(r'x = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)', curve_tests)[0]
        x = int(x_parts[0] + x_parts[1] + x_parts[2], 16)

        test_regex = r'With SHA-(\d+), message = "([a-zA-Z]*)":\n' \
                     r'\s*k = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
                     r'\s*r = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)\n' \
                     r'\s*s = ([0-9A-F]*)\n\s*([0-9A-F]*)\n\s*([0-9A-F]*)\n'

        hash_lookup = {
            '1': sha1,
            '224': sha224,
            '256': sha256,
            '384': sha384,
            '512': sha512
        }

        for test in findall(test_regex, curve_tests):
            h = hash_lookup[test[0]]
            msg = test[1]
            k = int(test[2] + test[3] + test[4], 16)
            r = int(test[5] + test[6] + test[7], 16)
            s = int(test[8] + test[9] + test[10], 16)

            self.assertEqual(k, RFC6979(msg, x, q, h).gen_nonce())
            self.assertEqual((r, s), sign(msg, x, curve=P521, hashfunc=h))
pricing.py 文件源码 项目:sparksteps 作者: jwplayer 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_demand_price(aws_region, instance_type):
    """Get AWS instance demand price.

    >>> print(get_demand_price('us-east-1', 'm4.2xlarge'))
    """
    soup = BeautifulSoup(urlopen(EC2_INSTANCES_INFO_URL), 'html.parser')
    table = soup.find('table', {'id': 'data'})
    row = table.find(id=instance_type)
    td = row.find('td', {'class': 'cost-ondemand-linux'})
    region_prices = json.loads(td['data-pricing'])
    return float(region_prices[aws_region])
shell.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def do_stack_adopt(hc, args):
    '''Adopt a stack.'''
    env_files, env = template_utils.process_multiple_environments_and_files(
        env_paths=args.environment_file)

    if not args.adopt_file:
        raise exc.CommandError(_('Need to specify %(arg)s') %
                               {'arg': '--adopt-file'})

    adopt_url = utils.normalise_file_path_to_url(args.adopt_file)
    adopt_data = request.urlopen(adopt_url).read()

    if not len(adopt_data):
        raise exc.CommandError('Invalid adopt-file, no data!')

    if args.create_timeout:
        logger.warning(_LW('%(arg1)s is deprecated, '
                           'please use %(arg2)s instead'),
                       {
                           'arg1': '-c/--create-timeout',
                           'arg2': '-t/--timeout'})

    fields = {
        'stack_name': args.name,
        'disable_rollback': not(args.enable_rollback),
        'adopt_stack_data': adopt_data,
        'parameters': utils.format_parameters(args.parameters),
        'files': dict(list(env_files.items())),
        'environment': env
    }

    timeout = args.timeout or args.create_timeout
    if timeout:
        fields['timeout_mins'] = timeout

    hc.stacks.create(**fields)
    do_stack_list(hc)
shell.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_resource_signal(hc, args):
    '''Send a signal to a resource.'''
    fields = {'stack_id': args.id,
              'resource_name': args.resource}
    data = args.data
    data_file = args.data_file
    if data and data_file:
        raise exc.CommandError(_('Can only specify one of data and data-file'))
    if data_file:
        data_url = utils.normalise_file_path_to_url(data_file)
        data = request.urlopen(data_url).read()
    if data:
        if isinstance(data, six.binary_type):
            data = data.decode('utf-8')
        try:
            data = jsonutils.loads(data)
        except ValueError as ex:
            raise exc.CommandError(_('Data should be in JSON format: %s') % ex)
        if not isinstance(data, dict):
            raise exc.CommandError(_('Data should be a JSON dict'))
        fields['data'] = data
    try:
        hc.resources.signal(**fields)
    except exc.HTTPNotFound:
        raise exc.CommandError(_('Stack or resource not found: '
                                 '%(id)s %(resource)s') %
                               {'id': args.id, 'resource': args.resource})
shell.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_config_create(hc, args):
    '''Create a software configuration.'''
    config = {
        'group': args.group,
        'config': ''
    }

    defn = {}
    if args.definition_file:
        defn_url = utils.normalise_file_path_to_url(
            args.definition_file)
        defn_raw = request.urlopen(defn_url).read() or '{}'
        defn = yaml.load(defn_raw, Loader=template_format.yaml_loader)

    config['inputs'] = defn.get('inputs', [])
    config['outputs'] = defn.get('outputs', [])
    config['options'] = defn.get('options', {})

    if args.config_file:
        config_url = utils.normalise_file_path_to_url(
            args.config_file)
        config['config'] = request.urlopen(config_url).read()

    # build a mini-template with a config resource and validate it
    validate_template = {
        'heat_template_version': '2013-05-23',
        'resources': {
            args.name: {
                'type': 'OS::Heat::SoftwareConfig',
                'properties': config
            }
        }
    }
    hc.stacks.validate(template=validate_template)

    config['name'] = args.name

    sc = hc.software_configs.create(**config)
    print(jsonutils.dumps(sc.to_dict(), indent=2))
software_config.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _create_config(heat_client, args):
    config = {
        'group': args.group,
        'config': ''
    }

    defn = {}
    if args.definition_file:
        defn_url = heat_utils.normalise_file_path_to_url(
            args.definition_file)
        defn_raw = request.urlopen(defn_url).read() or '{}'
        defn = yaml.load(defn_raw, Loader=template_format.yaml_loader)

    config['inputs'] = defn.get('inputs', [])
    config['outputs'] = defn.get('outputs', [])
    config['options'] = defn.get('options', {})

    if args.config_file:
        config_url = heat_utils.normalise_file_path_to_url(
            args.config_file)
        config['config'] = request.urlopen(config_url).read()

    # build a mini-template with a config resource and validate it
    validate_template = {
        'heat_template_version': '2013-05-23',
        'resources': {
            args.name: {
                'type': 'OS::Heat::SoftwareConfig',
                'properties': config
            }
        }
    }
    heat_client.stacks.validate(template=validate_template)

    config['name'] = args.name
    sc = heat_client.software_configs.create(**config).to_dict()
    rows = list(six.itervalues(sc))
    columns = list(six.iterkeys(sc))
    return columns, rows
resource.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _resource_signal(heat_client, args):
    fields = {'stack_id': args.stack,
              'resource_name': args.resource}
    data = args.data
    data_file = args.data_file
    if data and data_file:
        raise exc.CommandError(_('Should only specify one of data or '
                                 'data-file'))

    if data_file:
        data_url = heat_utils.normalise_file_path_to_url(data_file)
        data = request.urlopen(data_url).read()

    if data:
        try:
            data = jsonutils.loads(data)
        except ValueError as ex:
            raise exc.CommandError(_('Data should be in JSON format: %s') % ex)
        if not isinstance(data, dict):
            raise exc.CommandError(_('Data should be a JSON dict'))

        fields['data'] = data
    try:
        heat_client.resources.signal(**fields)
    except heat_exc.HTTPNotFound:
        raise exc.CommandError(_('Stack %(stack)s or resource %(resource)s '
                                 'not found.') %
                               {'stack': args.stack,
                                'resource': args.resource})
stack.py 文件源码 项目:eclcli 作者: nttcom 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def take_action(self, parsed_args):
        self.log.debug('take_action(%s)', parsed_args)

        client = self.app.client_manager.orchestration

        env_files, env = (
            template_utils.process_multiple_environments_and_files(
                env_paths=parsed_args.environment))

        adopt_url = heat_utils.normalise_file_path_to_url(
            parsed_args.adopt_file)
        adopt_data = request.urlopen(adopt_url).read().decode('utf-8')

        fields = {
            'stack_name': parsed_args.name,
            'disable_rollback': not parsed_args.enable_rollback,
            'adopt_stack_data': adopt_data,
            'parameters': heat_utils.format_parameters(parsed_args.parameter),
            'files': dict(list(env_files.items())),
            'environment': env,
            'timeout': parsed_args.timeout
        }

        stack = client.stacks.create(**fields)['stack']

        if parsed_args.wait:
            stack_status, msg = event_utils.poll_for_events(
                client, parsed_args.name, action='ADOPT')
            if stack_status == 'ADOPT_FAILED':
                raise exc.CommandError(msg)

        return _show_stack(client, stack['id'], format='table', short=True)
resolver.py 文件源码 项目:cloud-custodian 作者: capitalone 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def resolve(self, uri):
        if uri.startswith('s3://'):
            contents = self.get_s3_uri(uri)
        else:
            # TODO: in the case of file: content and untrusted
            # third parties, uri would need sanitization
            fh = urlopen(uri)
            contents = fh.read().decode('utf-8')
            fh.close()
        self.cache.save(("uri-resolver", uri), contents)
        return contents
historybuff.py 文件源码 项目:flask-ask 作者: johnwheeler 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _get_json_events_from_wikipedia(month, date):
    url = "{}{}_{}".format(URL_PREFIX, month, date)
    data = urlopen(url).read().decode('utf-8')
    return _parse_json(data)
verifier.py 文件源码 项目:flask-ask 作者: johnwheeler 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_certificate(cert_url):
    if not _valid_certificate_url(cert_url):
        raise VerificationError("Certificate URL verification failed")
    cert_data = urlopen(cert_url).read()
    cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_data)
    if not _valid_certificate(cert):
        raise VerificationError("Certificate verification failed")
    return cert
api_util.py 文件源码 项目:cegr-galaxy 作者: seqcode 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get(url):
    try:
        return json.loads(urlopen(url).read())
    except ValueError as e:
        stop_err(str(e))
stats_util.py 文件源码 项目:cegr-galaxy 作者: seqcode 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def post(api_key, url, data):
    url = make_url(api_key, url)
    response = Request(url, headers={'Content-Type': 'application/json'}, data=json.dumps(data))
    return json.loads(urlopen(response).read())
test.py 文件源码 项目:gatk-cwl-generator 作者: wtsi-hgi 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def example_data():
    if not os.path.isfile("cwl-example-data/chr22_cwl_test.cram"):
        from six.moves.urllib.request import urlopen
        import tarfile
        print("Downloading and extracting cwl-example-data")
        tgz = urlopen("https://cwl-example-data.cog.sanger.ac.uk/chr22_cwl_test.tgz")
        tar = tarfile.open(fileobj=tgz, mode="r|gz")
        tar.extractall(path="cwl-example-data")
        tar.close()
        tgz.close()


问题


面经


文章

微信
公众号

扫码关注公众号