python类OK的实例源码

utils.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
utils.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
utils.py 文件源码 项目:package-33c3 作者: info-beamer 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
__init__.py 文件源码 项目:micromaterials-api 作者: lpmi-13 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def deconstruct_list(response_data, model=None, status_code=OK):
    view_name = None
    if not response_data:
        raise KrumpException(
            'Response data is empty; the view name at least is required.')
    if len(response_data) >= 1:
        view_name = required_view_name(response_data[0])
    if len(response_data) >= 2:
        model = response_data[1]
    if len(response_data) >= 3:
        status_code = response_data[2]

    if model is None:
        model = {}
    if status_code is None:
        status_code = OK

    return view_name, model, status_code
metadata_server_test.py 文件源码 项目:forseti-security 作者: GoogleCloudPlatform 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_get_value_for_attribute_with_a_present_attribute(self, mock_meta_req):
        """Test get_value_for_attribute returns correctly.

        Setup:
            * Mock out a httplib.HTTPResponse .
            * Return that from _issue_http_request.

        Expected results:
            * A matching string.
        """
        mock_response = 'expected_response'

        with mock.patch('httplib.HTTPResponse',
                        mock.mock_open(read_data=mock_response)) as mock_http_resp:
            mock_http_resp.return_value.status = httplib.OK
            mock_meta_req.side_effect = mock_http_resp

            actual_response = metadata_server.get_value_for_attribute('')

        self.assertEqual(actual_response, mock_response)
brightcove.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _refresh_access_token(self):
        """
        Request new access token to send with requests to Brightcove. Access Token expires every 5 minutes.
        """
        url = "https://oauth.brightcove.com/v3/access_token"
        params = {"grant_type": "client_credentials"}
        auth_string = base64.encodestring(
            '{}:{}'.format(self.api_key, self.api_secret)
        ).replace('\n', '')
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Authorization": "Basic " + auth_string
        }
        resp = requests.post(url, headers=headers, data=params)
        if resp.status_code == httplib.OK:
            result = resp.json()
            return result['access_token']
brightcove.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get(self, url, headers=None, can_retry=True):
        """
        Issue REST GET request to a given URL. Can throw ApiClientError or its subclass.

        Arguments:
            url (str): API url to fetch a resource from.
            headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests.
            can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call.
        Returns:
            Response in python native data format.
        """
        headers_ = {'Authorization': 'Bearer ' + str(self.access_token)}
        if headers is not None:
            headers_.update(headers)
        resp = requests.get(url, headers=headers_)
        if resp.status_code == httplib.OK:
            return resp.json()
        elif resp.status_code == httplib.UNAUTHORIZED and can_retry:
            self.access_token = self._refresh_access_token()
            return self.get(url, headers, can_retry=False)
        else:
            raise BrightcoveApiClientError
vimeo.py 文件源码 项目:xblock-video 作者: appsembler 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get(self, url, headers=None, can_retry=False):
        """
        Issue REST GET request to a given URL. Can throw ApiClientError or its subclass.

        Arguments:
            url (str): API url to fetch a resource from.
            headers (dict): Headers necessary as per API, e.g. authorization bearer to perform
            authorised requests.
        Returns:
            Response in python native data format.
        """
        headers_ = {
            'Authorization': 'Bearer {}'.format(self.access_token.encode(encoding='utf-8')),
            'Accept': 'application/json'
        }
        if headers is not None:
            headers_.update(headers)
        resp = requests.get(url, headers=headers_)
        if resp.status_code == httplib.OK:
            return resp.json()
        else:
            raise VimeoApiClientError(_("Can't fetch requested data from API."))
request_handler.py 文件源码 项目:Protector 作者: trivago 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def connect_intercept(self):
        hostname = self.path.split(':')[0]
        certpath = "%s/%s.crt" % (self.certdir.rstrip('/'), hostname)

        with self.lock:
            if not os.path.isfile(certpath):
                epoch = "%d" % (time.time() * 1000)
                p1 = Popen(["openssl", "req", "-new", "-key", self.certkey, "-subj", "/CN=%s" % hostname], stdout=PIPE)
                p2 = Popen(["openssl", "x509", "-req", "-days", "3650", "-CA", self.cacert, "-CAkey", self.cakey,
                            "-set_serial", epoch, "-out", certpath], stdin=p1.stdout, stderr=PIPE)
                p2.communicate()

        self.wfile.write("%s %d %s\r\n" % (self.protocol_version, httplib.OK, 'Connection Established'))
        self.end_headers()

        self.connection = ssl.wrap_socket(self.connection, keyfile=self.certkey, certfile=certpath, server_side=True)
        self.rfile = self.connection.makefile("rb", self.rbufsize)
        self.wfile = self.connection.makefile("wb", self.wbufsize)

        conntype = self.headers.get('Proxy-Connection', '')
        if conntype.lower() == 'close':
            self.close_connection = 1
        elif conntype.lower() == 'keep-alive' and self.protocol_version >= "HTTP/1.1":
            self.close_connection = 0
request_handler.py 文件源码 项目:Protector 作者: trivago 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def connect_relay(self):
        address = self.path.split(':', 1)
        address[1] = int(address[1]) or 443
        try:
            s = socket.create_connection(address, timeout=self.timeout)
        except:
            self.send_error(httplib.BAD_GATEWAY)
            return
        self.send_response(httplib.OK, 'Connection Established')
        self.end_headers()

        conns = [self.connection, s]
        self.close_connection = 0
        while not self.close_connection:
            rlist, wlist, xlist = select.select(conns, [], conns, self.timeout)
            if xlist or not rlist:
                break
            for r in rlist:
                other = conns[1] if r is conns[0] else conns[0]
                data = r.recv(8192)
                if not data:
                    self.close_connection = 1
                    break
                other.sendall(data)
utils.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
utils.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
utils.py 文件源码 项目:slack_scholar 作者: xLeitix 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def wopiGetFile(fileid):
  '''Implements the GetFile WOPI call'''
  Wopi.refreshconfig()
  try:
    acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
    if acctok['exp'] < time.time():
      raise jwt.exceptions.ExpiredSignatureError
    Wopi.log.info('msg="GetFile" user="%s:%s" filename="%s" fileid="%s" token="%s"' % \
                  (acctok['ruid'], acctok['rgid'], acctok['filename'], fileid, flask.request.args['access_token'][-20:]))
    # stream file from storage to client
    resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream')
    resp.status_code = httplib.OK
    return resp
  except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
    Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
                     (flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
    return 'Invalid access token', httplib.UNAUTHORIZED
  except Exception, e:
    return _logGeneralExceptionAndReturn(e, flask.request)


#
# The following operations are all called on POST /wopi/files/<fileid>
#
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def wopiUnlock(fileid, reqheaders, acctok):
  '''Implements the Unlock WOPI call'''
  lock = reqheaders['X-WOPI-Lock']
  retrievedLock = _retrieveWopiLock(fileid, 'UNLOCK', lock, acctok)
  if not _compareWopiLocks(retrievedLock, lock):
    return _makeConflictResponse('UNLOCK', retrievedLock, lock, '', acctok['filename'])
  # OK, the lock matches. Remove any extended attribute related to locks and conflicts handling
  try:
    xrdcl.removefile(_getLockName(acctok['filename']), Wopi.lockruid, Wopi.lockrgid, 1)
  except IOError:
    # ignore, it's not worth to report anything here
    pass
  try:
    xrdcl.rmxattr(acctok['filename'], acctok['ruid'], acctok['rgid'], LASTSAVETIMEKEY)
  except IOError:
    # same as above
    pass
  # and update our internal list of opened files
  try:
    del Wopi.openfiles[acctok['filename']]
  except KeyError:
    # already removed?
    pass
  return 'OK', httplib.OK
wopiserver.py 文件源码 项目:wopiserver 作者: cernbox 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def wopiGetLock(fileid, reqheaders_unused, acctok):
  '''Implements the GetLock WOPI call'''
  resp = flask.Response()
  # throws exception if no lock
  resp.headers['X-WOPI-Lock'] = _retrieveWopiLock(fileid, 'GETLOCK', '', acctok)
  resp.status_code = httplib.OK
  # for statistical purposes, check whether a lock exists and update internal bookkeeping
  if resp.headers['X-WOPI-Lock']:
    try:
      # the file was already opened for write, check whether this is a new user
      if not acctok['username'] in Wopi.openfiles[acctok['filename']][1]:
        # yes it's a new user
        Wopi.openfiles[acctok['filename']][1].add(acctok['username'])
        if len(Wopi.openfiles[acctok['filename']][1]) > 1:
          # for later monitoring, explicitly log that this file is being edited by at least two users
          Wopi.log.info('msg="Collaborative editing detected" filename="%s" token="%s" users="%s"' % \
                         (acctok['filename'], flask.request.args['access_token'][-20:], list(Wopi.openfiles[acctok['filename']][1])))
    except KeyError:
      # existing lock but missing Wopi.openfiles[acctok['filename']] ?
      Wopi.openfiles[acctok['filename']] = (time.asctime(), sets.Set([acctok['username']]))
  return resp
stub_dispatcher.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _copy(gcs_stub, filename, headers):
  """Copy file.

  Args:
    gcs_stub: an instance of gcs stub.
    filename: dst filename of format /bucket/filename
    headers: a dict of request headers. Must contain _XGoogCopySource header.

  Returns:
    An _FakeUrlFetchResult instance.
  """
  source = _XGoogCopySource(headers).value
  result = _handle_head(gcs_stub, source)
  if result.status_code == httplib.NOT_FOUND:
    return result
  directive = headers.pop('x-goog-metadata-directive', 'COPY')
  if directive == 'REPLACE':
    gcs_stub.put_copy(source, filename, headers)
  else:
    gcs_stub.put_copy(source, filename, None)
  return _FakeUrlFetchResult(httplib.OK, {}, '')
stub_dispatcher.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _handle_head(gcs_stub, filename):
  """Handle HEAD request."""
  filestat = gcs_stub.head_object(filename)
  if not filestat:
    return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')

  http_time = common.posix_time_to_http(filestat.st_ctime)

  response_headers = {
      'x-goog-stored-content-length': filestat.st_size,
      'content-length': 0,
      'content-type': filestat.content_type,
      'etag': filestat.etag,
      'last-modified': http_time
  }

  if filestat.metadata:
    response_headers.update(filestat.metadata)

  return _FakeUrlFetchResult(httplib.OK, response_headers, '')


问题


面经


文章

微信
公众号

扫码关注公众号