python类OK的实例源码

test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_validate_image_status_before_upload_ok_v1(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_check_resp_status_and_retry = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = 'fakeData'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp

        self.glance.validate_image_status_before_upload_v1(
            mock_conn, fake_url, extra_headers=mock.Mock())

        self.assertTrue(mock_conn.getresponse.called)
        self.assertEqual(mock_head_resp.read.call_count, 2)
        self.assertFalse(mock_check_resp_status_and_retry.called)
test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_validate_image_status_before_upload_req_head_exception_v1(self):
        mock_conn = mock.Mock()
        mock_conn.request.side_effect = Fake_HTTP_Request_Error()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = 'fakeData'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp

        self.assertRaises(self.glance.RetryableError,
                          self.glance.validate_image_status_before_upload_v1,
                          mock_conn, fake_url, extra_headers=mock.Mock())
        mock_conn.request.assert_called_once()
        mock_head_resp.read.assert_not_called()
        mock_conn.getresponse.assert_not_called()
test_glance.py 文件源码 项目:os-xenapi 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_validate_image_status_before_upload_ok_v2_using_uwsgi(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_check_resp_status_and_retry = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = '{"status": "queued"}'
        mock_conn.getresponse.return_value = mock_head_resp

        fake_extra_headers = mock.Mock()
        fake_patch_path = 'fake_patch_path'

        self.glance.validate_image_status_before_upload_v2(
            mock_conn, fake_url, fake_extra_headers, fake_patch_path)

        self.assertTrue(mock_conn.getresponse.called)
        self.assertEqual(
            mock_head_resp.read.call_count, 2)
        self.assertFalse(mock_check_resp_status_and_retry.called)
        mock_conn.request.assert_called_with('GET',
                                             'fake_patch_path',
                                             headers=fake_extra_headers)
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
utils.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def Delete(
    name,
    creds,
    transport
):
  """Delete a tag or digest.

  Args:
    name: a tag or digest to be deleted.
    creds: the creds to use for deletion.
    transport: the transport to use to contact the registry.
  """
  docker_transport = docker_http.Transport(
      name, creds, transport, docker_http.DELETE)

  resp, unused_content = docker_transport.Request(
      '{scheme}://{registry}/v2/{repository}/manifests/{entity}'.format(
          scheme=docker_http.Scheme(name.registry),
          registry=name.registry,
          repository=name.repository,
          entity=_tag_or_digest(name)),
      method='DELETE',
      accepted_codes=[httplib.OK, httplib.ACCEPTED])
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _content(
      self,
      suffix,
      accepted_mimes=None,
      cache=True
  ):
    """Fetches content of the resources from registry by http calls."""
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    if suffix in self._response:
      return self._response[suffix]

    _, content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        accepted_codes=[httplib.OK],
        accepted_mimes=accepted_mimes)
    if cache:
      self._response[suffix] = content
    return content
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def blob_size(self, digest):
    """The byte size of the raw blob."""
    suffix = 'blobs/' + digest
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    resp, unused_content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        method='HEAD',
        accepted_codes=[httplib.OK])

    return int(resp['content-length'])

  # Large, do not memoize.
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def catalog(self, page_size=100):
    # TODO(user): Handle docker_name.Repository for /v2/<name>/_catalog
    if isinstance(self._name, docker_name.Repository):
      raise ValueError('Expected docker_name.Registry for "name"')

    url = '{scheme}://{registry}/v2/_catalog?n={page_size}'.format(
        scheme=docker_http.Scheme(self._name.registry),
        registry=self._name.registry,
        page_size=page_size)

    for _, content in self._transport.PaginatedRequest(
        url, accepted_codes=[httplib.OK]):
      wrapper_object = json.loads(content)

      if 'repositories' not in wrapper_object:
        raise docker_http.BadStateException(
            'Malformed JSON response: %s' % content)

      # TODO(user): This should return docker_name.Repository
      for repo in wrapper_object['repositories']:
        yield repo

  # __enter__ and __exit__ allow use as a context manager.
docker_image_list_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _content(
      self,
      suffix,
      accepted_mimes = None,
      cache = True
  ):
    """Fetches content of the resources from registry by http calls."""
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    if suffix in self._response:
      return self._response[suffix]

    _, content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        accepted_codes=[httplib.OK],
        accepted_mimes=accepted_mimes)
    if cache:
      self._response[suffix] = content
    return content
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _put_layer(
      self,
      image,
      layer_id
  ):
    """Upload the aufs tarball for a single layer."""
    # TODO(user): We should stream this instead of loading
    # it into memory.
    docker_http.Request(self._transport,
                        '{scheme}://{endpoint}/v1/images/{layer}/layer'.format(
                            scheme=docker_http.Scheme(self._endpoint),
                            endpoint=self._endpoint,
                            layer=layer_id),
                        self._token_creds,
                        accepted_codes=[httplib.OK],
                        body=image.layer(layer_id),
                        content_type='application/octet-stream')
docker_session_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def Delete(
    name,
    creds,
    transport
):
  """Delete a tag or digest.

  Args:
    name: a tag or digest to be deleted.
    creds: the credentials to use for deletion.
    transport: the transport to use to contact the registry.
  """
  docker_transport = docker_http.Transport(
      name, creds, transport, docker_http.DELETE)

  resp, unused_content = docker_transport.Request(
      '{scheme}://{registry}/v2/{repository}/manifests/{entity}'.format(
          scheme=docker_http.Scheme(name.registry),
          registry=name.registry,
          repository=name.repository,
          entity=_tag_or_digest(name)),
      method='DELETE',
      accepted_codes=[httplib.OK, httplib.ACCEPTED])
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _content(self, suffix, cache=True):
    """Fetches content of the resources from registry by http calls."""
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    if suffix in self._response:
      return self._response[suffix]

    _, content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        accepted_codes=[httplib.OK])
    if cache:
      self._response[suffix] = content
    return content
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def blob_size(self, digest):
    """The byte size of the raw blob."""
    suffix = 'blobs/' + digest
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    resp, unused_content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        method='HEAD',
        accepted_codes=[httplib.OK])

    return int(resp['content-length'])

  # Large, do not memoize.
docker_image_.py 文件源码 项目:containerregistry 作者: google 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def catalog(self, page_size=100):
    # TODO(user): Handle docker_name.Repository for /v2/<name>/_catalog
    if isinstance(self._name, docker_name.Repository):
      raise ValueError('Expected docker_name.Registry for "name"')

    url = '{scheme}://{registry}/v2/_catalog?n={page_size}'.format(
        scheme=docker_http.Scheme(self._name.registry),
        registry=self._name.registry,
        page_size=page_size)

    for _, content in self._transport.PaginatedRequest(
        url, accepted_codes=[httplib.OK]):
      wrapper_object = json.loads(content)

      if 'repositories' not in wrapper_object:
        raise docker_http.BadStateException(
            'Malformed JSON response: %s' % content)

      for repo in wrapper_object['repositories']:
        # TODO(user): This should return docker_name.Repository instead.
        yield repo

  # __enter__ and __exit__ allow use as a context manager.
test_ApiCalls.py 文件源码 项目:irida-miseq-uploader 作者: phac-nml 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_validate_URL_existence_url_ok(self, mock_cs, mock_url):

        url_ok = Foo()
        setattr(url_ok, "code", httplib.OK)

        mock_url.side_effect = [url_ok]
        mock_cs.side_effect = [None]

        api = API.apiCalls.ApiCalls("", "", "", "", "")
        validate_URL = api.validate_URL_existence

        url = "http://google.com"
        valid = True

        is_valid = validate_URL(url)
        self.assertEqual(is_valid, valid)
        API.apiCalls.urlopen.assert_called_with(url, timeout=api.max_wait_time)
apiCalls.py 文件源码 项目:irida-miseq-uploader 作者: phac-nml 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def session(self):
        if self._session_set_externally:
            return self._session

        try:
            self._session_lock.acquire()
            response = self._session.options(self.base_URL)
            if response.status_code != httplib.OK:
                raise Exception
            else:
                logging.debug("Existing session still works, going to reuse it.")
        except:
            logging.debug("Token is probably expired, going to get a new session.")
            oauth_service = self.get_oauth_service()
            access_token = self.get_access_token(oauth_service)
            self._session = oauth_service.get_session(access_token)
        finally:
            self._session_lock.release()

        return self._session
tests.py 文件源码 项目:beg-django-e-commerce 作者: Apress 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_view_product(self):
        """ test product view loads """
        product = Product.active.all()[0]
        product_url = product.get_absolute_url()
        url_entry = urlresolvers.resolve(product_url)
        template_name = url_entry[2]['template_name']
        response = self.client.get(product_url)
        self.failUnless(response)
        self.assertEqual(response.status_code, httplib.OK)
        self.assertTemplateUsed(response, template_name)
        self.assertContains(response, product.name)
        self.assertContains(response, html.escape(product.description))
        # check for cart form in product page response
        cart_form = response.context[0]['form']
        self.failUnless(cart_form)
        # check that the cart form is instance of correct form class
        self.failUnless(isinstance(cart_form, ProductAddToCartForm))

        product_reviews = response.context[0].get('product_reviews',None)
        self.failIfEqual(product_reviews, None)
test_folders.py 文件源码 项目:orangecloud-client 作者: antechrestos 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_get_folder(self):
        self.client.get.return_value = mock_api_response('/folders/folder-id',
                                                         httplib.OK,
                                                         None,
                                                         'folders', 'GET_{id}_response.json')
        folder = self.folders.get('folder-id')
        self.client.get.assert_called_with(self.client.get.return_value.url, params=None)
        self.assertIsNotNone(folder)
        self.assertEqual('folder-id', folder.id)
        self.assertEqual('folder-name', folder.name)
        self.assertEqual('root-id', folder.parentId)
        self.assertEqual(1, len(folder.files))
        self.assertEqual('file-id', folder.files[0].id)
        self.assertEqual(1, len(folder.subfolders))
        self.assertEqual('subfolder-id', folder.subfolders[0].id)
        self.assertEqual('folder-id', folder.subfolders[0].parentId)
ngas.py 文件源码 项目:ngas 作者: ICRAR 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_ngas_status():
    """
    Execute the STATUS command against the NGAS server on the host fabric is
    currently pointing at
    """
    try:
        serv = urllib2.urlopen('http://{0}:7777/STATUS'.format(env.host), timeout=5)
    except IOError:
        failure('Problem connecting to server {0}'.format(env.host))
        raise

    response = serv.read()
    serv.close()
    if response.find('Status="SUCCESS"') == -1:
        failure('Problem with response from {0}, not SUCESS as expected'.format(env.host))
        raise ValueError(response)
    else:
        success('Response from {0} OK'.format(env.host))
ngas.py 文件源码 项目:ngas 作者: ICRAR 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def upload_to(host, filename, port=7777):
    """
    Simple method to upload a file into NGAS
    """
    with contextlib.closing(httplib.HTTPConnection(host, port)) as conn:
        conn.putrequest('POST', '/QARCHIVE?filename=%s' % (urllib2.quote(os.path.basename(filename)),) )
        conn.putheader('Content-Length', os.stat(filename).st_size)
        conn.endheaders()
        with open(filename) as f:
            for data in iter(functools.partial(f.read, 4096), ''):
                conn.send(data)
        r = conn.getresponse()
        if r.status != httplib.OK:
            raise Exception("Error while QARCHIVE-ing %s to %s:%d:\nStatus: %d\n%s\n\n%s" % (filename, conn.host, conn.port, r.status, r.msg, r.read()))
        else:
            success("{0} successfully archived to {1}!".format(filename, host))
base.py 文件源码 项目:UPPlatform_Python_SDK 作者: Jawbone 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _request(self, url, method='GET', data=None, ok_statuses=None):
        """
        Issue an HTTP request using the authorized Http object, handle bad responses, set the Meta object from the
        response content, and return the data as JSON.

        :param url: endpoint to send the request
        :param method: HTTP method (e.g. GET, POST, etc.), defaults to GET
        :return: JSON data
        """
        #
        # TODO: clean up the ability to send a POST and add unit tests.
        #
        if data is None:
            req_body = None
        else:
            req_body = urllib.urlencode(data)
        self.resp, self.content = self.http.request(url, method, body=req_body)

        if ok_statuses is None:
            ok_statuses = [httplib.OK]
        self._raise_for_status(ok_statuses)
        resp_json = json.loads(self.content)
        self.meta = upapi.meta.Meta(**resp_json['meta'])
        return resp_json['data']
base.py 文件源码 项目:UPPlatform_Python_SDK 作者: Jawbone 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def disconnect(self):
        """
        Revoke access for this user.
        """
        self.delete(upapi.endpoints.DISCONNECT)
        self.credentials = None

    #
    # TODO: finish pubsub implementation.
    #
    # def set_pubsub(self, url):
    #     """
    #     Register a user-specific pubsub webhook.
    #
    #     :param url: user-specific webhook callback URL
    #     """
    #     resp = self.oauth.post(upapi.endpoints.PUBSUB, data={'webhook': url})
    #     self._raise_for_status([httplib.OK], resp)
    #
    # def delete_pubsub(self):
    #     """
    #     Delete a user-specific pubsub webhook.
    #     """
    #     self.delete(upapi.endpoints.PUBSUB)
test_base.py 文件源码 项目:UPPlatform_Python_SDK 作者: Jawbone 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test__raise_for_status(self, mock_resp):
        """
        Verify that an exceptions gets raised for unexpected responses.

        :param mock_resp: mocked httplib Response
        """
        #
        # ok_statuses should not raise
        #
        mock_resp.status = httplib.CREATED
        self.up.resp = mock_resp
        self.up.content = ''
        try:
            self.up._raise_for_status([httplib.OK, httplib.CREATED])
        except Exception as exc:
            self.fail('_raise_for_status unexpectedly threw {}'.format(exc))

        #
        # Anything else should raise.
        #
        mock_resp.status = httplib.ACCEPTED
        self.assertRaises(
            upapi.exceptions.UnexpectedAPIResponse,
            self.up._raise_for_status,
            [httplib.OK, httplib.CREATED])
sms_client.py 文件源码 项目:mWorkerService 作者: smices 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _parse_result(http_response, response):
    if http_response.status / 100 == httplib.CONTINUE / 100:
        raise BceClientError('Can not handle 1xx http status code')
    bse = None
    body = http_response.read()
    if body:
        d = json.loads(body)

        if 'message' in d and 'code' in d and 'requestId' in d:
            bse = BceServerError(d['message'], code=d['code'], request_id=d['requestId'])
        elif http_response.status / 100 == httplib.OK / 100:
            response.__dict__.update(json.loads(body, \
                    object_hook=utils.dict_to_python_object).__dict__)
            http_response.close()
            return True
    elif http_response.status / 100 == httplib.OK / 100:
        return True

    if bse is None:
        bse = BceServerError(http_response.reason, request_id=response.metadata.bce_request_id)
    bse.status_code = http_response.status
    raise bse
sms_client.py 文件源码 项目:mWorkerService 作者: smices 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _parse_result(http_response, response):
    if http_response.status / 100 == httplib.CONTINUE / 100:
        raise BceClientError('Can not handle 1xx http status code')
    bse = None
    body = http_response.read()
    if body:
        d = json.loads(body)

        if 'message' in d and 'code' in d and 'requestId' in d:
            bse = BceServerError(d['message'], code=d['code'], request_id=d['requestId'])
        elif http_response.status / 100 == httplib.OK / 100:
            response.__dict__.update(json.loads(body, \
                    object_hook=utils.dict_to_python_object).__dict__)
            http_response.close()
            return True
    elif http_response.status / 100 == httplib.OK / 100:
        return True

    if bse is None:
        bse = BceServerError(http_response.reason, request_id=response.metadata.bce_request_id)
    bse.status_code = http_response.status
    raise bse
common.py 文件源码 项目:qtip 作者: opnfv 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def check_endpoint_for_error(resource, operation=None):
    def _decorator(func):
        def _execute(name=None):
            try:
                return func(name), httplib.OK
            except error.NotFoundError:
                return connexion.problem(
                    httplib.NOT_FOUND,
                    '{} not found'.format(resource),
                    'Requested {} `{}` not found.'
                    .format(resource.lower(), name))
            except error.ToBeDoneError:
                return connexion.problem(
                    httplib.NOT_IMPLEMENTED,
                    '{} handler not implemented'.format(operation),
                    'Requested operation `{}` on {} not implemented.'
                    .format(operation.lower(), resource.lower()))
        return _execute
    return _decorator
utils.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
utils.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK


问题


面经


文章

微信
公众号

扫码关注公众号