def test_open_novisit(self):
def test_state(br):
self.assert_(br.request is None)
self.assert_(br.response() is None)
self.assertRaises(mechanize.BrowserStateError, br.back)
test_state(self.browser)
uri = urljoin(self.uri, "test_fixtures")
# note this involves a redirect, which should itself be non-visiting
r = self.browser.open_novisit(uri)
test_state(self.browser)
self.assert_("GeneralFAQ.html" in r.read(2048))
# Request argument instead of URL
r = self.browser.open_novisit(mechanize.Request(uri))
test_state(self.browser)
self.assert_("GeneralFAQ.html" in r.read(2048))
python类Request()的实例源码
def parse_file_ex(file,
base_uri,
select_default=False,
request_class=mechanize.Request,
encoding=None,
backwards_compat=False,
add_global=True):
raw = file.read()
root = content_parser(raw, transport_encoding=encoding)
forms, global_form = _form.parse_forms(
root,
base_uri,
select_default=select_default,
request_class=request_class)
if not add_global:
return list(forms)
return [global_form] + list(forms)
def test_request_host_lc(self):
from mechanize._clientcookie import request_host_lc
# this request is illegal (RFC2616, 14.2.3)
req = Request("http://1.1.1.1/", headers={"Host": "www.acme.com:80"})
# libwww-perl wants this response, but that seems wrong (RFC 2616,
# section 5.2, point 1., and RFC 2965 section 1, paragraph 3)
#assert request_host_lc(req) == "www.acme.com"
assert request_host_lc(req) == "1.1.1.1"
req = Request(
"http://www.acme.com/", headers={"Host": "irrelevant.com"})
assert request_host_lc(req) == "www.acme.com"
# not actually sure this one is valid Request object, so maybe should
# remove test for no host in url in request_host_lc function?
req = Request("/resource.html", headers={"Host": "www.acme.com"})
assert request_host_lc(req) == "www.acme.com"
# port shouldn't be in request-host
req = Request(
"http://www.acme.com:2345/resource.html",
headers={"Host": "www.acme.com:5432"})
assert request_host_lc(req) == "www.acme.com"
# the _lc function lower-cases the result
req = Request("http://EXAMPLE.com")
assert request_host_lc(req) == "example.com"
def test_netscape_misc(self):
# Some additional Netscape cookies tests.
from mechanize import CookieJar, Request
c = CookieJar()
headers = []
req = Request("http://foo.bar.acme.com/foo")
# Netscape allows a host part that contains dots
headers.append("Set-Cookie: Customer=WILE_E_COYOTE; domain=.acme.com")
res = FakeResponse(headers, "http://www.acme.com/foo")
c.extract_cookies(res, req)
# and that the domain is the same as the host without adding a leading
# dot to the domain. Should not quote even if strange chars are used
# in the cookie value.
headers.append("Set-Cookie: PART_NUMBER=3,4; domain=foo.bar.acme.com")
res = FakeResponse(headers, "http://www.acme.com/foo")
c.extract_cookies(res, req)
req = Request("http://foo.bar.acme.com/foo")
c.add_cookie_header(req)
assert (req.get_header("Cookie").find("PART_NUMBER=3,4") != -1 and
req.get_header("Cookie").find("Customer=WILE_E_COYOTE") != -1)
def handle(self, fn_name, action, *args, **kwds):
self.parent.calls.append((self, fn_name, args, kwds))
if action is None:
return None
elif action == "return self":
return self
elif action == "return response":
res = MockResponse(200, "OK", {}, "")
return res
elif action == "return request":
return Request("http://blah/")
elif action.startswith("error"):
code = action[action.rfind(" ") + 1:]
try:
code = int(code)
except ValueError:
pass
res = MockResponse(200, "OK", {}, "")
return self.parent.error("http", args[0], res, code, "", {})
elif action == "raise":
raise mechanize.URLError("blah")
assert False
def test_raise(self):
# raising URLError stops processing of request
o = OpenerDirector()
meth_spec = [
[("http_open", "raise")],
[("http_open", "return self")],
]
handlers = add_ordered_mock_handlers(o, meth_spec)
req = Request("http://example.com/")
self.assertRaises(mechanize.URLError, o.open, req)
self.assertEqual(o.calls, [(handlers[0], "http_open", (req, ), {})])
# def test_error(self):
# XXX this doesn't actually seem to be used in standard library,
# but should really be tested anyway...
def test_http_error(self):
# XXX http_error_default
# http errors are a special case
o = OpenerDirector()
meth_spec = [
[("http_open", "error 302")],
[("http_error_400", "raise"), "http_open"],
[("http_error_302", "return response"), "http_error_303",
"http_error"],
[("http_error_302")],
]
handlers = add_ordered_mock_handlers(o, meth_spec)
req = Request("http://example.com/")
r = o.open(req)
assert len(o.calls) == 2
calls = [(handlers[0], "http_open", (req, )), (
handlers[2], "http_error_302", (req, AlwaysEqual(), 302, "", {}))]
for expected, got in zip(calls, o.calls):
handler, method_name, args = expected
self.assertEqual((handler, method_name), got[:2])
self.assertEqual(args, got[2])
def test_errors(self):
h = HTTPErrorProcessor()
o = h.parent = MockOpener()
req = Request("http://example.com")
# all 2xx are passed through
r = mechanize._response.test_response()
newr = h.http_response(req, r)
self.assertTrue(r is newr)
self.assertTrue(not hasattr(o, "proto")) # o.error not called
r = mechanize._response.test_response(code=202, msg="Accepted")
newr = h.http_response(req, r)
self.assertTrue(r is newr)
self.assertTrue(not hasattr(o, "proto")) # o.error not called
r = mechanize._response.test_response(code=206, msg="Partial content")
newr = h.http_response(req, r)
self.assertTrue(r is newr)
self.assertTrue(not hasattr(o, "proto")) # o.error not called
# anything else calls o.error (and MockOpener returns None, here)
r = mechanize._response.test_response(code=502, msg="Bad gateway")
self.assertTrue(h.http_response(req, r) is None)
self.assertEqual(o.proto, "http") # o.error called
self.assertEqual(o.args, (req, r, 502, "Bad gateway", AlwaysEqual()))
def test_referer(self):
h = HTTPRefererProcessor()
o = h.parent = MockOpener()
# normal case
url = "http://example.com/"
req = Request(url)
r = MockResponse(200, "OK", {}, "", url)
newr = h.http_response(req, r)
self.assert_(r is newr)
self.assert_(h.referer == url)
newreq = h.http_request(req)
self.assert_(req is newreq)
self.assert_(req.unredirected_hdrs["Referer"] == url)
# don't clobber existing Referer
ref = "http://set.by.user.com/"
req.add_unredirected_header("Referer", ref)
newreq = h.http_request(req)
self.assert_(req is newreq)
self.assert_(req.unredirected_hdrs["Referer"] == ref)
def test_http_equiv(self):
h = HTTPEquivProcessor()
o = h.parent = MockOpener()
data = ('<html><head>'
'<meta http-equiv="Refresh" content="spam&eggs">'
'</head></html>')
headers = [
("Foo", "Bar"),
("Content-type", "text/html"),
("Refresh", "blah"),
]
url = "http://example.com/"
req = Request(url)
r = mechanize._response.make_response(data, headers, url, 200, "OK")
newr = h.http_response(req, r)
new_headers = newr.info()
self.assertEqual(new_headers["Foo"], "Bar")
self.assertEqual(new_headers["Refresh"], "spam&eggs")
self.assertEqual(
new_headers.getheaders("Refresh"), ["blah", "spam&eggs"])
def test_redirect_bad_uri(self):
# bad URIs should be cleaned up before redirection
from mechanize._response import test_html_response
from_url = "http://example.com/a.html"
bad_to_url = "http://example.com/b. |html"
good_to_url = "http://example.com/b.%20%7Chtml"
h = HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
h.http_error_302(
req,
test_html_response(),
302,
"Blah",
http_message({
"location": bad_to_url
}), )
self.assertEqual(o.req.get_full_url(), good_to_url)
def test_refresh_bad_uri(self):
# bad URIs should be cleaned up before redirection
from mechanize._response import test_html_response
from_url = "http://example.com/a.html"
bad_to_url = "http://example.com/b. |html"
good_to_url = "http://example.com/b.%20%7Chtml"
h = HTTPRefreshProcessor(max_time=None, honor_time=False)
o = h.parent = MockOpener()
req = Request("http://example.com/")
r = test_html_response(
headers=[("refresh", '0; url="%s"' % bad_to_url)])
newr = h.http_response(req, r)
headers = o.args[-1]
self.assertEqual(headers["Location"], good_to_url)
def test_proxy(self):
o = OpenerDirector()
ph = mechanize.ProxyHandler(dict(http="proxy.example.com:3128"))
o.add_handler(ph)
meth_spec = [[("http_open", "return response")]]
handlers = add_ordered_mock_handlers(o, meth_spec)
o._maybe_reindex_handlers()
req = Request("http://acme.example.com/")
self.assertEqual(req.get_host(), "acme.example.com")
r = o.open(req)
self.assertEqual(req.get_host(), "proxy.example.com:3128")
self.assertEqual([(handlers[0], "http_open")],
[tup[0:2] for tup in o.calls])
def test_proxy_https_proxy_authorization(self):
o = OpenerDirector()
ph = mechanize.ProxyHandler(dict(https='proxy.example.com:3128'))
o.add_handler(ph)
https_handler = MockHTTPSHandler()
o.add_handler(https_handler)
req = Request("https://www.example.com/")
req.add_header("Proxy-Authorization", "FooBar")
req.add_header("User-Agent", "Grail")
self.assertEqual(req.get_host(), "www.example.com")
self.assertIsNone(req._tunnel_host)
r = o.open(req)
# Verify Proxy-Authorization gets tunneled to request.
# httpsconn req_headers do not have the Proxy-Authorization header but
# the req will have.
self.assertFalse(("Proxy-Authorization",
"FooBar") in https_handler.httpconn.req_headers)
self.assertTrue(
("User-Agent", "Grail") in https_handler.httpconn.req_headers)
self.assertIsNotNone(req._tunnel_host)
self.assertEqual(req.get_host(), "proxy.example.com:3128")
self.assertEqual(req.get_header("Proxy-authorization"), "FooBar")
def get(self, url, headers={}):
"Mechanize Get request"
browser = self.get_browser()
request_headers = []
response = {}
error = {}
for key, value in headers.iteritems():
request_headers.append((key, value))
browser.addheaders = request_headers
try:
response = browser.open(mechanize.Request(url))
response = json.loads(response.read())
except (mechanize.HTTPError, mechanize.URLError) as e:
error = e
if isinstance(e, mechanize.HTTPError):
error_message = e.read()
print("\n******\nGET Error: %s %s" %
(url, error_message))
else:
print(e.reason.args)
# bubble error back up after printing relevant details
raise e
return {'response': response, 'error': error}
def post(self, url, data=None, headers={}):
"Mechanize Post request"
browser = self.get_browser()
response = {}
error = {}
try:
response = browser.open(mechanize.Request(
url=url, data=data, headers=headers))
except (mechanize.HTTPError, mechanize.URLError) as e:
error = e
if isinstance(e, mechanize.HTTPError):
error_message = e.read()
print("\n******\nPOST Error: %s %s %s" %
(url, error_message, str(data)))
else:
print(e.reason.args)
# bubble error back up after printing relevant details
raise e
return {'response': response, 'error': error}
def add_cookie_header(self, request):
"""Add correct Cookie: header to request (mechanize.Request object).
The Cookie2 header is also added unless policy.hide_cookie2 is true.
The request object (usually a mechanize.Request instance) must support
the methods get_full_url, get_host, is_unverifiable, get_type,
has_header, get_header, header_items and add_unredirected_header, as
documented by urllib2.
"""
debug("add_cookie_header")
cookies = self.cookies_for_request(request)
attrs = self._cookie_attrs(cookies)
if attrs:
if not request.has_header("Cookie"):
request.add_unredirected_header("Cookie", "; ".join(attrs))
# if necessary, advertise that we know RFC 2965
if self._policy.rfc2965 and not self._policy.hide_cookie2:
for cookie in cookies:
if cookie.version != 1 and not request.has_header("Cookie2"):
request.add_unredirected_header("Cookie2", '$Version="1"')
break
self.clear_expired_cookies()
def extract_cookies(self, response, request):
"""Extract cookies from response, where allowable given the request.
Look for allowable Set-Cookie: and Set-Cookie2: headers in the response
object passed as argument. Any of these headers that are found are
used to update the state of the object (subject to the policy.set_ok
method's approval).
The response object (usually be the result of a call to
mechanize.urlopen, or similar) should support an info method, which
returns a mimetools.Message object (in fact, the 'mimetools.Message
object' may be any object that provides a getheaders method).
The request object (usually a mechanize.Request instance) must support
the methods get_full_url, get_type, get_host, and is_unverifiable, as
documented by mechanize, and the port attribute (the port number). The
request is used to set default values for cookie-attributes as well as
for checking that the cookie is OK to be set.
"""
debug("extract_cookies: %s", response.info())
self._policy._now = self._now = int(time.time())
for cookie in self._make_cookies(response, request):
if cookie.expires is not None and cookie.expires <= self._now:
# Expiry date in past is request to delete cookie. This can't be
# in DefaultCookiePolicy, because can't delete cookies there.
try:
self.clear(cookie.domain, cookie.path, cookie.name)
except KeyError:
pass
debug("Expiring cookie, domain='%s', path='%s', name='%s'",
cookie.domain, cookie.path, cookie.name)
elif self._policy.set_ok(cookie, request):
debug(" setting cookie: %s", cookie)
self.set_cookie(cookie)
_clientcookie.py 文件源码
项目:plugin.video.streamondemand-pureita
作者: orione7
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def add_cookie_header(self, request):
"""Add correct Cookie: header to request (mechanize.Request object).
The Cookie2 header is also added unless policy.hide_cookie2 is true.
The request object (usually a mechanize.Request instance) must support
the methods get_full_url, get_host, is_unverifiable, get_type,
has_header, get_header, header_items and add_unredirected_header, as
documented by urllib2.
"""
debug("add_cookie_header")
cookies = self.cookies_for_request(request)
attrs = self._cookie_attrs(cookies)
if attrs:
if not request.has_header("Cookie"):
request.add_unredirected_header("Cookie", "; ".join(attrs))
# if necessary, advertise that we know RFC 2965
if self._policy.rfc2965 and not self._policy.hide_cookie2:
for cookie in cookies:
if cookie.version != 1 and not request.has_header("Cookie2"):
request.add_unredirected_header("Cookie2", '$Version="1"')
break
self.clear_expired_cookies()
_clientcookie.py 文件源码
项目:plugin.video.streamondemand-pureita
作者: orione7
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def extract_cookies(self, response, request):
"""Extract cookies from response, where allowable given the request.
Look for allowable Set-Cookie: and Set-Cookie2: headers in the response
object passed as argument. Any of these headers that are found are
used to update the state of the object (subject to the policy.set_ok
method's approval).
The response object (usually be the result of a call to
mechanize.urlopen, or similar) should support an info method, which
returns a mimetools.Message object (in fact, the 'mimetools.Message
object' may be any object that provides a getheaders method).
The request object (usually a mechanize.Request instance) must support
the methods get_full_url, get_type, get_host, and is_unverifiable, as
documented by mechanize, and the port attribute (the port number). The
request is used to set default values for cookie-attributes as well as
for checking that the cookie is OK to be set.
"""
debug("extract_cookies: %s", response.info())
self._policy._now = self._now = int(time.time())
for cookie in self._make_cookies(response, request):
if cookie.expires is not None and cookie.expires <= self._now:
# Expiry date in past is request to delete cookie. This can't be
# in DefaultCookiePolicy, because can't delete cookies there.
try:
self.clear(cookie.domain, cookie.path, cookie.name)
except KeyError:
pass
debug("Expiring cookie, domain='%s', path='%s', name='%s'",
cookie.domain, cookie.path, cookie.name)
elif self._policy.set_ok(cookie, request):
debug(" setting cookie: %s", cookie)
self.set_cookie(cookie)
def add_cookie_header(self, request):
"""Add correct Cookie: header to request (mechanize.Request object).
The Cookie2 header is also added unless policy.hide_cookie2 is true.
The request object (usually a mechanize.Request instance) must support
the methods get_full_url, get_host, is_unverifiable, get_type,
has_header, get_header, header_items and add_unredirected_header, as
documented by urllib2.
"""
debug("add_cookie_header")
cookies = self.cookies_for_request(request)
attrs = self._cookie_attrs(cookies)
if attrs:
if not request.has_header("Cookie"):
request.add_unredirected_header("Cookie", "; ".join(attrs))
# if necessary, advertise that we know RFC 2965
if self._policy.rfc2965 and not self._policy.hide_cookie2:
for cookie in cookies:
if cookie.version != 1 and not request.has_header("Cookie2"):
request.add_unredirected_header("Cookie2", '$Version="1"')
break
self.clear_expired_cookies()
def extract_cookies(self, response, request):
"""Extract cookies from response, where allowable given the request.
Look for allowable Set-Cookie: and Set-Cookie2: headers in the response
object passed as argument. Any of these headers that are found are
used to update the state of the object (subject to the policy.set_ok
method's approval).
The response object (usually be the result of a call to
mechanize.urlopen, or similar) should support an info method, which
returns a mimetools.Message object (in fact, the 'mimetools.Message
object' may be any object that provides a getheaders method).
The request object (usually a mechanize.Request instance) must support
the methods get_full_url, get_type, get_host, and is_unverifiable, as
documented by mechanize, and the port attribute (the port number). The
request is used to set default values for cookie-attributes as well as
for checking that the cookie is OK to be set.
"""
debug("extract_cookies: %s", response.info())
self._policy._now = self._now = int(time.time())
for cookie in self._make_cookies(response, request):
if cookie.expires is not None and cookie.expires <= self._now:
# Expiry date in past is request to delete cookie. This can't be
# in DefaultCookiePolicy, because can't delete cookies there.
try:
self.clear(cookie.domain, cookie.path, cookie.name)
except KeyError:
pass
debug("Expiring cookie, domain='%s', path='%s', name='%s'",
cookie.domain, cookie.path, cookie.name)
elif self._policy.set_ok(cookie, request):
debug(" setting cookie: %s", cookie)
self.set_cookie(cookie)
def test_redirect_with_timeout(self):
timeout_log = self._monkey_patch_socket()
timeout = 10.
# 301 redirect due to missing final '/'
req = mechanize.Request(urljoin(self.test_uri, "test_fixtures"),
timeout=timeout)
r = self.browser.open(req)
self.assert_("GeneralFAQ.html" in r.read(2048))
timeout_log.verify(timeout)
def test_retrieve(self):
# not passing an explicit filename downloads to a temporary file
# using a Request object instead of a URL works
url = urljoin(self.uri, "/mechanize/")
opener = self.build_opener()
verif = CallbackVerifier(self)
request = mechanize.Request(url)
filename, headers = opener.retrieve(request, reporthook=verif.callback)
self.assertEquals(request.visit, False)
self._check_retrieve(url, filename, headers)
opener.close()
# closing the opener removed the temporary file
self.failIf(os.path.isfile(filename))
def test_domain_return_ok(self):
# test optimization: .domain_return_ok() should filter out most
# domains in the CookieJar before we try to access them (because that
# may require disk access -- in particular, with MSIECookieJar)
# This is only a rough check for performance reasons, so it's not too
# critical as long as it's sufficiently liberal.
import mechanize
pol = mechanize.DefaultCookiePolicy()
for url, domain, ok in [
("http://foo.bar.com/", "blah.com", False),
("http://foo.bar.com/", "rhubarb.blah.com", False),
("http://foo.bar.com/", "rhubarb.foo.bar.com", False),
("http://foo.bar.com/", ".foo.bar.com", True),
("http://foo.bar.com/", "foo.bar.com", True),
("http://foo.bar.com/", ".bar.com", True),
("http://foo.bar.com/", "com", True),
("http://foo.com/", "rhubarb.foo.com", False),
("http://foo.com/", ".foo.com", True),
("http://foo.com/", "foo.com", True),
("http://foo.com/", "com", True),
("http://foo/", "rhubarb.foo", False),
("http://foo/", ".foo", True),
("http://foo/", "foo", True),
("http://foo/", "foo.local", True),
("http://foo/", ".local", True),
]:
request = mechanize.Request(url)
r = pol.domain_return_ok(domain, request)
if ok:
self.assert_(r)
else:
self.assert_(not r)
def test_request_path(self):
from mechanize._clientcookie import request_path
# with parameters
req = Request("http://www.example.com/rheum/rhaponticum;"
"foo=bar;sing=song?apples=pears&spam=eggs#ni")
self.assertEquals(
request_path(req), "/rheum/rhaponticum;foo=bar;sing=song")
# without parameters
req = Request("http://www.example.com/rheum/rhaponticum?"
"apples=pears&spam=eggs#ni")
self.assertEquals(request_path(req), "/rheum/rhaponticum")
# missing final slash
req = Request("http://www.example.com")
self.assert_(request_path(req) == "/")
def test_request_port(self):
from mechanize._clientcookie import request_port, DEFAULT_HTTP_PORT
req = Request(
"http://www.acme.com:1234/", headers={"Host": "www.acme.com:4321"})
assert request_port(req) == "1234"
req = Request(
"http://www.acme.com/", headers={"Host": "www.acme.com:4321"})
assert request_port(req) == DEFAULT_HTTP_PORT
def test_effective_request_host(self):
from mechanize import effective_request_host
self.assertEquals(
effective_request_host(Request("http://www.EXAMPLE.com/spam")),
"www.example.com")
self.assertEquals(
effective_request_host(Request("http://bob/spam")), "bob.local")
def test_domain_block(self):
from mechanize import CookieJar, DefaultCookiePolicy
#import logging; logging.getLogger("mechanize").setLevel(logging.DEBUG)
pol = DefaultCookiePolicy(rfc2965=True, blocked_domains=[".acme.com"])
c = CookieJar(policy=pol)
headers = ["Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/"]
req = Request("http://www.acme.com/")
res = FakeResponse(headers, "http://www.acme.com/")
c.extract_cookies(res, req)
assert len(c) == 0
pol.set_blocked_domains(["acme.com"])
c.extract_cookies(res, req)
assert len(c) == 1
c.clear()
req = Request("http://www.roadrunner.net/")
res = FakeResponse(headers, "http://www.roadrunner.net/")
c.extract_cookies(res, req)
assert len(c) == 1
req = Request("http://www.roadrunner.net/")
c.add_cookie_header(req)
assert (req.has_header("Cookie") and req.has_header("Cookie2"))
c.clear()
pol.set_blocked_domains([".acme.com"])
c.extract_cookies(res, req)
assert len(c) == 1
# set a cookie with blocked domain...
req = Request("http://www.acme.com/")
res = FakeResponse(headers, "http://www.acme.com/")
cookies = c.make_cookies(res, req)
c.set_cookie(cookies[0])
assert len(c) == 2
# ... and check it doesn't get returned
c.add_cookie_header(req)
assert not req.has_header("Cookie")
def test_missing_final_slash(self):
# Missing slash from request URL's abs_path should be assumed present.
from mechanize import CookieJar, Request, DefaultCookiePolicy
url = "http://www.acme.com"
c = CookieJar(DefaultCookiePolicy(rfc2965=True))
interact_2965(c, url, "foo=bar; Version=1")
req = Request(url)
assert len(c) == 1
c.add_cookie_header(req)
assert req.has_header("Cookie")