def test_http_doubleslash(self):
# Checks that the presence of an unnecessary double slash in a url doesn't break anything
# Previously, a double slash directly after the host could cause incorrect parsing of the url
h = urllib2.AbstractHTTPHandler()
o = h.parent = MockOpener()
data = ""
ds_urls = [
"http://example.com/foo/bar/baz.html",
"http://example.com//foo/bar/baz.html",
"http://example.com/foo//bar/baz.html",
"http://example.com/foo/bar//baz.html",
]
for ds_url in ds_urls:
ds_req = Request(ds_url, data)
# Check whether host is determined correctly if there is no proxy
np_ds_req = h.do_request_(ds_req)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
# Check whether host is determined correctly if there is a proxy
ds_req.set_proxy("someproxy:3128",None)
p_ds_req = h.do_request_(ds_req)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
python类html()的实例源码
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http', 'https', 'ftp']
invalid_schemes = ['file', 'imap', 'ldap']
schemeless_url = "example.com/b.html"
h = urllib2.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib2.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)
def test_http_doubleslash(self):
# Checks that the presence of an unnecessary double slash in a url doesn't break anything
# Previously, a double slash directly after the host could cause incorrect parsing of the url
h = urllib2.AbstractHTTPHandler()
o = h.parent = MockOpener()
data = ""
ds_urls = [
"http://example.com/foo/bar/baz.html",
"http://example.com//foo/bar/baz.html",
"http://example.com/foo//bar/baz.html",
"http://example.com/foo/bar//baz.html",
]
for ds_url in ds_urls:
ds_req = Request(ds_url, data)
# Check whether host is determined correctly if there is no proxy
np_ds_req = h.do_request_(ds_req)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
# Check whether host is determined correctly if there is a proxy
ds_req.set_proxy("someproxy:3128",None)
p_ds_req = h.do_request_(ds_req)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http', 'https', 'ftp']
invalid_schemes = ['file', 'imap', 'ldap']
schemeless_url = "example.com/b.html"
h = urllib2.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib2.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)
def test_http_doubleslash(self):
# Checks that the presence of an unnecessary double slash in a url doesn't break anything
# Previously, a double slash directly after the host could cause incorrect parsing of the url
h = urllib2.AbstractHTTPHandler()
o = h.parent = MockOpener()
data = ""
ds_urls = [
"http://example.com/foo/bar/baz.html",
"http://example.com//foo/bar/baz.html",
"http://example.com/foo//bar/baz.html",
"http://example.com/foo/bar//baz.html",
]
for ds_url in ds_urls:
ds_req = Request(ds_url, data)
# Check whether host is determined correctly if there is no proxy
np_ds_req = h.do_request_(ds_req)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
# Check whether host is determined correctly if there is a proxy
ds_req.set_proxy("someproxy:3128",None)
p_ds_req = h.do_request_(ds_req)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http', 'https', 'ftp']
invalid_schemes = ['file', 'imap', 'ldap']
schemeless_url = "example.com/b.html"
h = urllib2.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib2.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)
def test_http_doubleslash(self):
# Checks that the presence of an unnecessary double slash in a url doesn't break anything
# Previously, a double slash directly after the host could cause incorrect parsing of the url
h = urllib2.AbstractHTTPHandler()
o = h.parent = MockOpener()
data = ""
ds_urls = [
"http://example.com/foo/bar/baz.html",
"http://example.com//foo/bar/baz.html",
"http://example.com/foo//bar/baz.html",
"http://example.com/foo/bar//baz.html",
]
for ds_url in ds_urls:
ds_req = Request(ds_url, data)
# Check whether host is determined correctly if there is no proxy
np_ds_req = h.do_request_(ds_req)
self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
# Check whether host is determined correctly if there is a proxy
ds_req.set_proxy("someproxy:3128",None)
p_ds_req = h.do_request_(ds_req)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
def test_invalid_redirect(self):
from_url = "http://example.com/a.html"
valid_schemes = ['http', 'https', 'ftp']
invalid_schemes = ['file', 'imap', 'ldap']
schemeless_url = "example.com/b.html"
h = urllib2.HTTPRedirectHandler()
o = h.parent = MockOpener()
req = Request(from_url)
req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
for scheme in invalid_schemes:
invalid_url = scheme + '://' + schemeless_url
self.assertRaises(urllib2.HTTPError, h.http_error_302,
req, MockFile(), 302, "Security Loophole",
MockHeaders({"location": invalid_url}))
for scheme in valid_schemes:
valid_url = scheme + '://' + schemeless_url
h.http_error_302(req, MockFile(), 302, "That's fine",
MockHeaders({"location": valid_url}))
self.assertEqual(o.req.get_full_url(), valid_url)
def new_document(self, content, format="html", title=None, member_ids=[]):
"""Creates a new document from the given content.
To create a document in a folder, include the folder ID in the list
of member_ids, e.g.,
client = quip.QuipClient(...)
user = client.get_authenticated_user()
client.new_document(..., member_ids=[user["private_folder_id"]])
"""
return self._fetch_json("threads/new-document", post_data={
"content": content,
"format": format,
"title": title,
"member_ids": ",".join(member_ids),
})
def edit_document(self, thread_id, content, operation=APPEND, format="html",
section_id=None, **kwargs):
"""Edits the given document, adding the given content.
`operation` should be one of the constants described above. If
`operation` is relative to another section of the document, you must
also specify the `section_id`.
"""
args = {
"thread_id": thread_id,
"content": content,
"location": operation,
"format": format,
"section_id": section_id,
}
args.update(kwargs)
return self._fetch_json("threads/edit-document", post_data=args)
def setProxy(self, host, type='http'):
"""
Set the proxy for all requests to use.
@type type: C{string}
@see: U{The Python Docs<http://docs.python.org/library/urllib2.html#
urllib2.Request.set_proxy>}
"""
self.proxy_args = (host, type)
def test_redirect_fragment(self):
redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
hdeh = urllib2.HTTPDefaultErrorHandler()
hrh = urllib2.HTTPRedirectHandler()
o = build_test_opener(hh, hdeh, hrh)
fp = o.open('http://www.example.com')
self.assertEqual(fp.geturl(), redirected_url.strip())
def test_url_fragment(self):
req = Request("http://www.python.org/?qs=query#fragment=true")
self.assertEqual("/?qs=query", req.get_selector())
req = Request("http://www.python.org/#fun=true")
self.assertEqual("/", req.get_selector())
# Issue 11703: geturl() omits fragment in the original URL.
url = 'http://docs.python.org/library/urllib2.html#OK'
req = Request(url)
self.assertEqual(req.get_full_url(), url)
def test_redirect_fragment(self):
redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
hdeh = urllib2.HTTPDefaultErrorHandler()
hrh = urllib2.HTTPRedirectHandler()
o = build_test_opener(hh, hdeh, hrh)
fp = o.open('http://www.example.com')
self.assertEqual(fp.geturl(), redirected_url.strip())
def test_url_fragment(self):
req = Request("http://www.python.org/?qs=query#fragment=true")
self.assertEqual("/?qs=query", req.get_selector())
req = Request("http://www.python.org/#fun=true")
self.assertEqual("/", req.get_selector())
# Issue 11703: geturl() omits fragment in the original URL.
url = 'http://docs.python.org/library/urllib2.html#OK'
req = Request(url)
self.assertEqual(req.get_full_url(), url)
def test_redirect_fragment(self):
redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
hdeh = urllib2.HTTPDefaultErrorHandler()
hrh = urllib2.HTTPRedirectHandler()
o = build_test_opener(hh, hdeh, hrh)
fp = o.open('http://www.example.com')
self.assertEqual(fp.geturl(), redirected_url.strip())
def test_url_fragment(self):
req = Request("http://www.python.org/?qs=query#fragment=true")
self.assertEqual("/?qs=query", req.get_selector())
req = Request("http://www.python.org/#fun=true")
self.assertEqual("/", req.get_selector())
# Issue 11703: geturl() omits fragment in the original URL.
url = 'http://docs.python.org/library/urllib2.html#OK'
req = Request(url)
self.assertEqual(req.get_full_url(), url)
def test_redirect_fragment(self):
redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
hdeh = urllib2.HTTPDefaultErrorHandler()
hrh = urllib2.HTTPRedirectHandler()
o = build_test_opener(hh, hdeh, hrh)
fp = o.open('http://www.example.com')
self.assertEqual(fp.geturl(), redirected_url.strip())
def test_url_fragment(self):
req = Request("http://www.python.org/?qs=query#fragment=true")
self.assertEqual("/?qs=query", req.get_selector())
req = Request("http://www.python.org/#fun=true")
self.assertEqual("/", req.get_selector())
# Issue 11703: geturl() omits fragment in the original URL.
url = 'http://docs.python.org/library/urllib2.html#OK'
req = Request(url)
self.assertEqual(req.get_full_url(), url)
def copy_document(self, id, title=None, member_ids=[]):
"""Creates a new document from the given thread ID.
To create it in a folder, include the folder ID in member_ids.
"""
old_thread = self.get_thread(id)
return self.new_document(
old_thread["html"], title=title or old_thread["thread"]["title"],
member_ids=member_ids)
def get_section(self, section_id, thread_id=None, document_html=None):
if not document_html:
document_html = self.get_thread(thread_id).get("html")
if not document_html:
return None
tree = self.parse_document_html(document_html)
element = list(tree.iterfind(".//*[@id='%s']" % section_id))
if not element:
return None
return element[0]
def _get_container(self, thread_id, document_html, container, index):
if not document_html:
document_html = self.get_thread(thread_id).get("html")
if not document_html:
return None
tree = self.parse_document_html(document_html)
lists = list(tree.iter(container))
if not lists:
return None
try:
return lists[index]
except IndexError:
return None
def parse_document_html(self, document_html):
"""Returns an `ElementTree` for the given Quip document HTML"""
document_xml = "<html>" + document_html + "</html>"
return xml.etree.cElementTree.fromstring(document_xml.encode("utf-8"))
def get_blob(self, thread_id, blob_id):
"""Returns a file-like object with the contents of the given blob from
the given thread.
The object is described in detail here:
https://docs.python.org/2/library/urllib2.html#urllib2.urlopen
"""
request = urllib2.Request(
url=self._url("blob/%s/%s" % (thread_id, blob_id)))
if self.access_token:
request.add_header("Authorization", "Bearer " + self.access_token)
try:
return urllib2.urlopen(request, timeout=self.request_timeout)
except urllib2.HTTPError, error:
try:
# Extract the developer-friendly error message from the response
message = json.loads(error.read())["error_description"]
except Exception:
raise error
if (self.retry_rate_limit and error.code == 503 and
message == "Over Rate Limit"):
# Retry later.
reset_time = float(error.headers.get("X-RateLimit-Reset"))
delay = max(2, reset_time - time.time() + 1)
logging.warning("Rate Limit, delaying for %d seconds" % delay)
time.sleep(delay)
return self.get_blob(thread_id, blob_id)
else:
raise QuipError(error.code, message, error)
def merge_comments(self, original_id, children_ids):
"""Given an original document and a set of exact duplicates, copies
all comments and messages on the duplicates to the original.
"""
import re
threads = self.get_threads(children_ids + [original_id])
original_section_ids = re.findall(r" id='([a-zA-Z0-9]{11})'",
threads[original_id]["html"])
for thread_id in children_ids:
thread = threads[thread_id]
child_section_ids = re.findall(r" id='([a-zA-Z0-9]{11})'",
thread["html"])
parent_map = dict(zip(child_section_ids, original_section_ids))
messages = self.get_messages(thread_id)
for message in reversed(messages):
kwargs = {}
if "parts" in message:
kwargs["parts"] = json.dumps(message["parts"])
else:
kwargs["content"] = message["text"]
if "annotation" in message:
section_id = None
if "highlight_section_ids" in message["annotation"]:
section_id = message["annotation"][
"highlight_section_ids"][0]
else:
anno_loc = thread["html"].find(
'<annotation id="%s"' % message["annotation"]["id"])
loc = thread["html"].rfind("id=", 0, anno_loc)
if anno_loc >= 0 and loc >= 0:
section_id = thread["html"][loc+4:loc+15]
if section_id and section_id in parent_map:
kwargs["section_id"] = parent_map[section_id]
if "files" in message:
attachments = []
for blob_info in message["files"]:
blob = self.get_blob(thread_id, blob_info["hash"])
new_blob = self.put_blob(
original_id, blob, name=blob_info["name"])
attachments.append(new_blob["id"])
if attachments:
kwargs["attachments"] = ",".join(attachments)
self.new_message(original_id, **kwargs)