def test_response_headers(self):
# test response with multiple message headers with the same field name.
text = ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: Customer="WILE_E_COYOTE";'
' Version="1"; Path="/acme"\r\n'
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
' Path="/acme"\r\n'
'\r\n'
'No body\r\n')
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
', '
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
s = FakeSocket(text)
r = httplib.HTTPResponse(s)
r.begin()
cookies = r.getheader("Set-Cookie")
if cookies != hdr:
self.fail("multiple headers not combined properly")
python类HTTP的实例源码
def test_chunked_head(self):
chunked_start = (
'HTTP/1.1 200 OK\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'a\r\n'
'hello world\r\n'
'1\r\n'
'd\r\n'
)
sock = FakeSocket(chunked_start + '0\r\n')
resp = httplib.HTTPResponse(sock, method="HEAD")
resp.begin()
self.assertEqual(resp.read(), '')
self.assertEqual(resp.status, 200)
self.assertEqual(resp.reason, 'OK')
self.assertTrue(resp.isclosed())
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL, join with original:
newurl = basejoin(self.type + ":" + url, newurl)
# For security reasons we do not allow redirects to protocols
# other than HTTP, HTTPS or FTP.
newurl_lower = newurl.lower()
if not (newurl_lower.startswith('http://') or
newurl_lower.startswith('https://') or
newurl_lower.startswith('ftp://')):
raise IOError('redirect error', errcode,
errmsg + " - Redirection to url '%s' is not allowed" %
newurl,
headers)
return self.open(newurl)
def make_connection(self, host):
self.realhost = host
proxies = urllib.getproxies()
proxyurl = None
if 'http' in proxies:
proxyurl = proxies['http']
elif 'all' in proxies:
proxyurl = proxies['all']
if proxyurl:
urltype, proxyhost = urllib.splittype(proxyurl)
host, selector = urllib.splithost(proxyhost)
h = httplib.HTTP(host)
self.proxy_is_used = True
return h
else:
self.proxy_is_used = False
return Transport.make_connection(self, host)
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL, join with original:
newurl = basejoin(self.type + ":" + url, newurl)
# For security reasons we do not allow redirects to protocols
# other than HTTP, HTTPS or FTP.
newurl_lower = newurl.lower()
if not (newurl_lower.startswith('http://') or
newurl_lower.startswith('https://') or
newurl_lower.startswith('ftp://')):
raise IOError('redirect error', errcode,
errmsg + " - Redirection to url '%s' is not allowed" %
newurl,
headers)
return self.open(newurl)
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL, join with original:
newurl = basejoin(self.type + ":" + url, newurl)
# For security reasons we do not allow redirects to protocols
# other than HTTP, HTTPS or FTP.
newurl_lower = newurl.lower()
if not (newurl_lower.startswith('http://') or
newurl_lower.startswith('https://') or
newurl_lower.startswith('ftp://')):
raise IOError('redirect error', errcode,
errmsg + " - Redirection to url '%s' is not allowed" %
newurl,
headers)
return self.open(newurl)
def post_multipart(host, selector, fields, files):
"""
Post fields and files to an http host as multipart/form-data.
fields is a sequence of (name, value) elements for regular form fields.
files is a sequence of (name, filename, value) elements for data to be uploaded as files
Return the server's response page.
"""
content_type, body = encode_multipart_formdata(fields, files)
h = httplib.HTTP(host)
h.putrequest('POST', selector)
h.putheader('content-type', content_type)
h.putheader('content-length', str(len(body)))
h.endheaders()
h.send(body)
errcode, errmsg, headers = h.getreply()
return h.file.read()
def make_connection(self, host):
# create a HTTPS connection object from a host descriptor
host, extra_headers, x509 = self.get_host_info(host)
http = HTTPTLSConnection(host, None,
self.username, self.password,
self.sharedKey,
self.certChain, self.privateKey,
self.checker.cryptoID,
self.checker.protocol,
self.checker.x509Fingerprint,
self.checker.x509TrustList,
self.checker.x509CommonName,
self.settings)
http2 = httplib.HTTP()
http2._setup(http)
return http2
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL, join with original:
newurl = basejoin(self.type + ":" + url, newurl)
# For security reasons we do not allow redirects to protocols
# other than HTTP, HTTPS or FTP.
newurl_lower = newurl.lower()
if not (newurl_lower.startswith('http://') or
newurl_lower.startswith('https://') or
newurl_lower.startswith('ftp://')):
raise IOError('redirect error', errcode,
errmsg + " - Redirection to url '%s' is not allowed" %
newurl,
headers)
return self.open(newurl)
def make_connection(self, host):
# return an existing connection if possible. This allows
# HTTP/1.1 keep-alive.
if self._connection and host == self._connection[0]:
http = self._connection[1]
else:
# create a HTTPS connection object from a host descriptor
chost, extra_headers, x509 = self.get_host_info(host)
http = HTTPTLSConnection(chost, None,
username=self.username, password=self.password,
certChain=self.certChain, privateKey=self.privateKey,
checker=self.checker,
settings=self.settings,
ignoreAbruptClose=self.ignoreAbruptClose)
# store the host argument along with the connection object
self._connection = host, http
if not self.conn_class_is_http:
return http
http2 = httplib.HTTP()
http2._setup(http)
return http2
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL, join with original:
newurl = basejoin(self.type + ":" + url, newurl)
# For security reasons we do not allow redirects to protocols
# other than HTTP, HTTPS or FTP.
newurl_lower = newurl.lower()
if not (newurl_lower.startswith('http://') or
newurl_lower.startswith('https://') or
newurl_lower.startswith('ftp://')):
raise IOError('redirect error', errcode,
errmsg + " - Redirection to url '%s' is not allowed" %
newurl,
headers)
return self.open(newurl)
def send(self, event, msg, key):
id, url = key
request = self.format_request(
"%s: %s" %
(event, msg) if msg else event)
webservice = httplib.HTTP(url)
webservice.putrequest("POST", id)
webservice.putheader("Host", url)
webservice.putheader("Content-type", "text/xml")
webservice.putheader("X-NotificationClass", "2")
webservice.putheader("X-WindowsPhone-Target", "toast")
webservice.putheader("Content-length", "%d" % len(request))
webservice.endheaders()
webservice.send(request)
webservice.close()
def do_search_files(self, files):
h = httplib.HTTP(self.server)
h.putrequest(
'GET',
"search/web/results/?q=" +
self.word +
"filetype:" +
self.files +
"&elements_per_page=50&start_index=" +
self.counter)
h.putheader('Host', self.hostname)
h.putheader('User-agent', self.userAgent)
h.endheaders()
returncode, returnmsg, headers = h.getreply()
self.results = h.getfile().read()
self.totalresults += self.results
def run(dmn,file):
h = httplib.HTTP('www.google.com')
h.putrequest('GET',"/search?num=500&q=site:"+dmn+"+filetype:"+file)
h.putheader('Host', 'www.google.com')
h.putheader('User-agent', 'Internet Explorer 6.0 ')
h.putheader('Referrer', 'www.g13net.com')
h.endheaders()
returncode, returnmsg, headers = h.getreply()
data=h.getfile().read()
data=re.sub('<b>','',data)
for e in ('>','=','<','\\','(',')','"','http',':','//'):
data = string.replace(data,e,' ')
r1 = re.compile('[-_.a-zA-Z0-9.-_]*'+'\.'+file)
res = r1.findall(data)
return res
def run_search(self):
try:
con = httplib.HTTP(self.server)
con.putrequest('GET', '/search?q=%40'+self.keyword)
con.putheader('Host', self.host)
con.putheader('Cookie', 'SRCHHPGUSR=ADLT=DEMOTE&NRSLT=50')
con.putheader('Accept-Language', 'en-us,en')
con.putheader('User-agent', self.u_agent)
con.endheaders()
# return code,messagge and header
returncode, returnmsg, header = con.getreply()
self.results = con.getfile().read()
self.tresult += self.results
except Exception as err:
print "\t\t|"
print "\t\t|__"+self.r+" Server not found!!\n"+self.t
def getreply(self):
file = self.sock.makefile('rb')
data = ''.join(file.readlines())
file.close()
self.file = StringIO(data)
line = self.file.readline()
try:
[ver, code, msg] = line.split(None, 2)
except ValueError:
try:
[ver, code] = line.split(None, 1)
msg = ""
except ValueError:
return -1, line, None
if ver[:5] != 'HTTP/':
return -1, line, None
code = int(code)
msg = msg.strip()
headers = mimetools.Message(self.file, 0)
return ver, code, msg, headers
def __snd_request(self, method, uri, headers={}, body='', eh=1):
try:
h = HTTP()
h.connect(self.host, self.port)
h.putrequest(method, uri)
for n, v in headers.items():
h.putheader(n, v)
if eh:
h.endheaders()
if body:
h.send(body)
ver, code, msg, hdrs = h.getreply()
data = h.getfile().read()
h.close()
except Exception:
raise NotAvailable(sys.exc_value)
return http_response(ver, code, msg, hdrs, data)
# HTTP methods