def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
python类HTTPResponse()的实例源码
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
__init__.py 文件源码
项目:amazon-alexa-twilio-customer-service
作者: ameerbadri
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
def parse_json(http_response, response):
"""If the body is not empty, convert it to a python object and set as the value of
response.body. http_response is always closed if no error occurs.
:param http_response: the http_response object returned by HTTPConnection.getresponse()
:type http_response: httplib.HTTPResponse
:param response: general response object which will be returned to the caller
:type response: baidubce.BceResponse
:return: always true
:rtype bool
"""
body = http_response.read()
if body:
response.__dict__.update(json.loads(body, object_hook=utils.dict_to_python_object).__dict__)
http_response.close()
return True
def _parse_bos_object(http_response, response):
"""Sets response.body to http_response and response.user_metadata to a dict consists of all http
headers starts with 'x-bce-meta-'.
:param http_response: the http_response object returned by HTTPConnection.getresponse()
:type http_response: httplib.HTTPResponse
:param response: general response object which will be returned to the caller
:type response: baidubce.BceResponse
:return: always true
:rtype bool
"""
user_metadata = {}
for k, v in http_response.getheaders():
if k.startswith(http_headers.BCE_USER_METADATA_PREFIX):
k = k[len(http_headers.BCE_USER_METADATA_PREFIX):]
user_metadata[k.decode(baidubce.DEFAULT_ENCODING)] = \
v.decode(baidubce.DEFAULT_ENCODING)
response.metadata.user_metadata = user_metadata
response.data = http_response
return True
def parse_json(http_response, response):
"""If the body is not empty, convert it to a python object and set as the value of
response.body. http_response is always closed if no error occurs.
:param http_response: the http_response object returned by HTTPConnection.getresponse()
:type http_response: httplib.HTTPResponse
:param response: general response object which will be returned to the caller
:type response: baidubce.BceResponse
:return: always true
:rtype bool
"""
body = http_response.read()
if body:
response.__dict__.update(json.loads(body, object_hook=utils.dict_to_python_object).__dict__)
http_response.close()
return True
def Delete(self, uri, extra_headers=None, url_params=None,
escape_params=True):
"""Deletes the entry at the given URI.
Args:
uri: string The URI of the entry to be deleted. Example:
'/base/feeds/items/ITEM-ID'
extra_headers: dict (optional) HTTP headers which are to be included.
The client automatically sets the Content-Type and
Authorization headers.
url_params: dict (optional) Additional URL parameters to be included
in the URI. These are translated into query arguments
in the form '&dict_key=value&...'.
Example: {'max-results': '250'} becomes &max-results=250
escape_params: boolean (optional) If false, the calling code has already
ensured that the query will form a valid URL (all
reserved characters have been escaped). If true, this
method will escape the query and any URL parameters
provided.
Returns:
httplib.HTTPResponse Server's response to the DELETE request.
"""
return self.request('DELETE', uri, data=None, headers=extra_headers,
url_params=url_params)
def create_tcp_connection(self, hostname, port, timeout, **kwargs):
sock = socket.create_connection((self.proxy_host, int(self.proxy_port)))
if hostname.endswith('.appspot.com'):
hostname = 'www.google.com'
request_data = 'CONNECT %s:%s HTTP/1.1\r\n' % (hostname, port)
if self.proxy_username and self.proxy_password:
request_data += 'Proxy-Authorization: Basic %s\r\n' % base64.b64encode(('%s:%s' % (self.proxy_username, self.proxy_password)).encode()).decode().strip()
request_data += '\r\n'
sock.sendall(request_data)
response = httplib.HTTPResponse(sock)
response.fp.close()
response.fp = sock.makefile('rb', 0)
response.begin()
if response.status >= 400:
raise httplib.BadStatusLine('%s %s %s' % (response.version, response.status, response.reason))
return sock
def test_appid_exist(ssl_sock, appid):
request_data = 'GET /_gh/ HTTP/1.1\r\nHost: %s.appspot.com\r\n\r\n' % appid
ssl_sock.send(request_data.encode())
response = httplib.HTTPResponse(ssl_sock, buffering=True)
response.begin()
if response.status == 404:
#xlog.warn("app check %s status:%d", appid, response.status)
return False
if response.status == 503:
# out of quota
return True
if response.status != 200:
xlog.warn("test appid %s status:%d", appid, response.status)
content = response.read()
if "GoAgent" not in content:
#xlog.warn("app check %s content:%s", appid, content)
return False
return True
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key] = value
self.status = int(self.get('status', self.status))
def test_status_lines(self):
# Test HTTP status lines
body = "HTTP/1.1 200 Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
resp.begin()
self.assertEqual(resp.read(0), '') # Issue #20007
self.assertFalse(resp.isclosed())
self.assertEqual(resp.read(), 'Text')
self.assertTrue(resp.isclosed())
body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
self.assertRaises(httplib.BadStatusLine, resp.begin)
def test_response_headers(self):
# test response with multiple message headers with the same field name.
text = ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: Customer="WILE_E_COYOTE";'
' Version="1"; Path="/acme"\r\n'
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
' Path="/acme"\r\n'
'\r\n'
'No body\r\n')
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
', '
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
s = FakeSocket(text)
r = httplib.HTTPResponse(s)
r.begin()
cookies = r.getheader("Set-Cookie")
if cookies != hdr:
self.fail("multiple headers not combined properly")
def test_chunked_head(self):
chunked_start = (
'HTTP/1.1 200 OK\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'a\r\n'
'hello world\r\n'
'1\r\n'
'd\r\n'
)
sock = FakeSocket(chunked_start + '0\r\n')
resp = httplib.HTTPResponse(sock, method="HEAD")
resp.begin()
self.assertEqual(resp.read(), '')
self.assertEqual(resp.status, 200)
self.assertEqual(resp.reason, 'OK')
self.assertTrue(resp.isclosed())
def test_close(self):
import httplib
# calling .close() on urllib2's response objects should close the
# underlying socket
# delve deep into response to fetch socket._socketobject
response = _urlopen_with_retry("http://www.example.com/")
abused_fileobject = response.fp
self.assertIs(abused_fileobject.__class__, socket._fileobject)
httpresponse = abused_fileobject._sock
self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
fileobject = httpresponse.fp
self.assertIs(fileobject.__class__, socket._fileobject)
self.assertTrue(not fileobject.closed)
response.close()
self.assertTrue(fileobject.closed)
def __init__(self, info):
# info is either an email.Message or
# an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders():
self[key.lower()] = value
self.status = info.status
self['status'] = str(self.status)
self.reason = info.reason
self.version = info.version
elif isinstance(info, email.Message.Message):
for key, value in info.items():
self[key.lower()] = value
self.status = int(self['status'])
else:
for key, value in info.iteritems():
self[key.lower()] = value
self.status = int(self.get('status', self.status))
self.reason = self.get('reason', self.reason)