def __init__(self, cacheLocation,url,setCacheHeader=True):
self.cacheLocation = cacheLocation
hash = md5.new(url).hexdigest()
StringIO.StringIO.__init__(self, file(self.cacheLocation + "/" + hash+".body").read())
self.url = url
self.code = 200
self.msg = "OK"
headerbuf = file(self.cacheLocation + "/" + hash+".headers").read()
if setCacheHeader:
headerbuf += "x-cache: %s/%s\r\n" % (self.cacheLocation,hash)
self.headers = httplib.HTTPMessage(StringIO.StringIO(headerbuf))
python类HTTPMessage()的实例源码
def __init__(self, response_proto):
"""Constructor.
Args:
response_proto: the URLFetchResponse proto buffer to wrap.
"""
self.__pb = response_proto
self.content = response_proto.content()
self.status_code = response_proto.statuscode()
self.content_was_truncated = response_proto.contentwastruncated()
self.final_url = response_proto.finalurl() or None
self.header_msg = httplib.HTTPMessage(
StringIO.StringIO(''.join(['%s: %s\n' % (h.key(), h.value())
for h in response_proto.header_list()] + ['\n'])))
self.headers = _CaselessDict(self.header_msg.items())
def __init__(self, response_proto):
"""Constructor.
Args:
response_proto: the URLFetchResponse proto buffer to wrap.
"""
self.__pb = response_proto
self.content = response_proto.content()
self.status_code = response_proto.statuscode()
self.content_was_truncated = response_proto.contentwastruncated()
self.final_url = response_proto.finalurl() or None
self.header_msg = httplib.HTTPMessage(
StringIO.StringIO(''.join(['%s: %s\n' % (h.key(), h.value())
for h in response_proto.header_list()] + ['\n'])))
self.headers = _CaselessDict(self.header_msg.items())
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
headers = httplib.HTTPMessage(StringIO(data))
response = urllib.addinfourl(StringIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response
def to_addinfourl(response):
"""Convert a `django.http.HttpResponse` to a `urllib2.addinfourl`."""
headers_raw = response.serialize_headers()
headers = httplib.HTTPMessage(io.BytesIO(headers_raw))
return urllib2.addinfourl(
fp=io.BytesIO(response.content), headers=headers,
url=None, code=response.status_code)
def __init__(self, response_proto):
"""Constructor.
Args:
response_proto: the URLFetchResponse proto buffer to wrap.
"""
self.__pb = response_proto
self.content = response_proto.content()
self.status_code = response_proto.statuscode()
self.content_was_truncated = response_proto.contentwastruncated()
self.final_url = response_proto.finalurl() or None
self.header_msg = httplib.HTTPMessage(
StringIO.StringIO(''.join(['%s: %s\n' % (h.key(), h.value())
for h in response_proto.header_list()] + ['\n'])))
self.headers = _CaselessDict(self.header_msg.items())
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
if PY2:
headers = HTTPMessage(BytesIO(data))
else:
import email
headers = email.message_from_string(data)
response = addinfourl(BytesIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
headers = httplib.HTTPMessage(StringIO(data))
response = urllib.addinfourl(StringIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
headers = httplib.HTTPMessage(StringIO(data))
response = urllib.addinfourl(StringIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response
def __init__(self, response_proto):
"""Constructor.
Args:
response_proto: the URLFetchResponse proto buffer to wrap.
"""
self.__pb = response_proto
self.content = response_proto.content()
self.status_code = response_proto.statuscode()
self.content_was_truncated = response_proto.contentwastruncated()
self.final_url = response_proto.finalurl() or None
self.header_msg = httplib.HTTPMessage(
StringIO.StringIO(''.join(['%s: %s\n' % (h.key(), h.value())
for h in response_proto.header_list()] + ['\n'])))
self.headers = _CaselessDict(self.header_msg.items())
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
headers = httplib.HTTPMessage(StringIO(data))
response = urllib.addinfourl(StringIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response
def http_message(mapping):
"""
>>> http_message({"Content-Type": "text/html"}).items()
[('content-type', 'text/html')]
"""
f = []
for kv in mapping.items():
f.append("%s: %s" % kv)
f.append("")
msg = httplib.HTTPMessage(StringIO.StringIO("\r\n".join(f)))
return msg
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
headers = httplib.HTTPMessage(StringIO(data))
response = urllib.addinfourl(StringIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response
def __init__(self, response_proto):
"""Constructor.
Args:
response_proto: the URLFetchResponse proto buffer to wrap.
"""
self.__pb = response_proto
self.content = response_proto.content()
self.status_code = response_proto.statuscode()
self.content_was_truncated = response_proto.contentwastruncated()
self.final_url = response_proto.finalurl() or None
self.header_msg = httplib.HTTPMessage(
StringIO.StringIO(''.join(['%s: %s\n' % (h.key(), h.value())
for h in response_proto.header_list()] + ['\n'])))
self.headers = _CaselessDict(self.header_msg.items())
def __init__(self, cacheLocation,url,setCacheHeader=True):
self.cacheLocation = cacheLocation
hash = hashlib.md5(url).hexdigest()
StringIO.StringIO.__init__(self, file(self.cacheLocation + "/" + hash+".body").read())
self.url = url
self.code = 200
self.msg = "OK"
headerbuf = file(self.cacheLocation + "/" + hash+".headers").read()
if setCacheHeader:
headerbuf += "x-cache: %s/%s\r\n" % (self.cacheLocation,hash)
self.headers = httplib.HTTPMessage(StringIO.StringIO(headerbuf))