def test_encoding(self):
import mechanize
from StringIO import StringIO
import urllib
# always take first encoding, since that's the one from the real HTTP
# headers, rather than from HTTP-EQUIV
b = mechanize.Browser()
for s, ct in [
("", mechanize._html.DEFAULT_ENCODING),
("Foo: Bar\r\n\r\n", mechanize._html.DEFAULT_ENCODING),
("Content-Type: text/html; charset=UTF-8\r\n\r\n", "UTF-8"),
("Content-Type: text/html; charset=UTF-8\r\n"
"Content-Type: text/html; charset=KOI8-R\r\n\r\n", "UTF-8"),
]:
msg = mimetools.Message(StringIO(s))
r = urllib.addinfourl(StringIO(""), msg, "http://www.example.com/")
b.set_response(r)
self.assertEqual(b.encoding(), ct)
python类StringIO()的实例源码
def test_select_form(self):
from mechanize import _response
br = TestBrowser()
fp = StringIO.StringIO('''<html>
<form name="a"></form>
<form name="b" data-ac="123"></form>
<form name="c" class="x"></form>
</html>''')
headers = mimetools.Message(
StringIO.StringIO("Content-type: text/html"))
response = _response.response_seek_wrapper(
_response.closeable_response(fp, headers, "http://example.com/",
200, "OK"))
br.set_response(response)
for i, n in enumerate('abc'):
br.select_form(nr=i)
self.assertEqual(br.form.name, n)
br.select_form(nr=0), br.select_form(name=n)
self.assertEqual(br.form.name, n)
br.select_form(data_ac=re.compile(r'\d+'))
self.assertEqual(br.form.name, 'b')
br.select_form(class_=lambda x: x == 'x')
self.assertEqual(br.form.name, 'c')
def test_str(self):
from mechanize import _response
br = TestBrowser()
self.assertEqual(str(br), "<TestBrowser (not visiting a URL)>")
fp = StringIO.StringIO('<html><form name="f"><input /></form></html>')
headers = mimetools.Message(
StringIO.StringIO("Content-type: text/html"))
response = _response.response_seek_wrapper(
_response.closeable_response(fp, headers, "http://example.com/",
200, "OK"))
br.set_response(response)
self.assertEqual(str(br), "<TestBrowser visiting http://example.com/>")
br.select_form(nr=0)
self.assertEqual(
str(br), """\
<TestBrowser visiting http://example.com/
selected form:
<f GET http://example.com/ application/x-www-form-urlencoded
<TextControl(<None>=)>>
>""")
def monkey_patch_httplib(self, putheader):
def do_nothing(*args, **kwds):
return
def getresponse(self_):
class Response(object):
msg = mimetools.Message(StringIO.StringIO(""))
status = 200
reason = "OK"
def read(self__, sz=-1):
return ""
return Response()
self.monkey_patch(httplib.HTTPConnection, "putheader", putheader)
self.monkey_patch(httplib.HTTPConnection, "connect", do_nothing)
self.monkey_patch(httplib.HTTPConnection, "send", do_nothing)
self.monkey_patch(httplib.HTTPConnection, "close", do_nothing)
self.monkey_patch(httplib.HTTPConnection, "getresponse", getresponse)
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout
row, col = self.s // self.ncol, self.s % self.ncol
desc = self.desc.tolist()
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
outfile.write("\n".join("".join(row) for row in desc)+"\n")
if self.lastaction is not None:
outfile.write(" ({})\n".format(self.get_action_meanings()[self.lastaction]))
else:
outfile.write("\n")
return outfile
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout
row, col = self.s // self.ncol, self.s % self.ncol
desc = self.desc.tolist()
desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)
outfile.write("\n".join("".join(row) for row in desc)+"\n")
if self.lastaction is not None:
outfile.write(" ({})\n".format(self.get_action_meanings()[self.lastaction]))
else:
outfile.write("\n")
return outfile
def getText(self, program_name, interval):
"""
:type interval: Interval.Interval
:param program_name:
:param interval:
:return:
"""
rewrites = self.programs.get(program_name)
start = interval.start
stop = interval.stop
# ensure start/end are in range
if stop > len(self.tokens.tokens) - 1: stop = len(self.tokens.tokens) - 1
if start < 0: start = 0
# if no instructions to execute
if not rewrites: return self.tokens.getText(interval)
buf = StringIO()
indexToOp = self._reduceToSingleOperationPerIndex(rewrites)
i = start
while all((i <= stop, i < len(self.tokens.tokens))):
op = indexToOp.pop(i, None)
token = self.tokens.get(i)
if op is None:
if token.type != Token.EOF: buf.write(token.text)
i += 1
else:
i = op.execute(buf)
if stop == len(self.tokens.tokens) - 1:
for op in indexToOp.values():
if op.index >= len(self.tokens.tokens) - 1: buf.write(
op.text) # TODO: this check is probably not needed
return buf.getvalue()
def execute(self, buf):
"""
:type buf: StringIO.StringIO
:param buf:
:return:
"""
return self.index
def __init__(self, code, msg, headers, data, url=None):
StringIO.StringIO.__init__(self, data)
self.code, self.msg, self.headers, self.url = code, msg, headers, url
def http_open(self, req):
import mimetools, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = httplib.responses[self.code]
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
"http", req, MockFile(), self.code, name, msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
def __init__(self, code, msg, headers, data, url=None):
StringIO.StringIO.__init__(self, data)
self.code, self.msg, self.headers, self.url = code, msg, headers, url
def http_open(self, req):
import mimetools, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = httplib.responses[self.code]
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
"http", req, MockFile(), self.code, name, msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
def ch_en(self,response):
if response.info().get('Content-Encoding') == 'gzip' or response.info().get('Content-Encoding') == 'x-gzip':
buf = StringIO(response.read())
f = gzip.GzipFile(fileobj=buf)
data = f.read()
elif response.info().get('Content-Encoding') == 'deflate':
f = StringIO.StringIO(zlib.decompress(response.read()))
data = f.read()
else:
data = response.read()
return data
def __init__(self, code, msg, headers, data, url=None):
StringIO.StringIO.__init__(self, data)
self.code, self.msg, self.headers, self.url = code, msg, headers, url
def http_open(self, req):
import mimetools, httplib, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = httplib.responses[self.code]
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
"http", req, MockFile(), self.code, name, msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
def __init__(self, code, msg, headers, data, url=None):
StringIO.StringIO.__init__(self, data)
self.code, self.msg, self.headers, self.url = code, msg, headers, url
def http_open(self, req):
import mimetools, httplib, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = httplib.responses[self.code]
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
"http", req, MockFile(), self.code, name, msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
def __init__(self, url="http://example.com/", data=None, info=None):
self.url = self._url = url
self.fp = StringIO.StringIO(data)
if info is None:
info = {}
self._info = self._headers = MockHeaders(info)
self.close_called = False
def __init__(self, code, msg, headers, data, url=None):
StringIO.StringIO.__init__(self, data)
self.code, self.msg, self.headers, self.url = code, msg, headers, url
def http_open(self, req):
import mimetools
import copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
name = "Not important"
msg = mimetools.Message(StringIO(self.headers))
return self.parent.error("http", req,
test_response(), self.code, name, msg)
else:
self.req = req
return test_response("", [], req.get_full_url())
def test_redirected_robots_txt(self):
# redirected robots.txt fetch shouldn't result in another attempted
# robots.txt fetch to check the redirection is allowed!
import mechanize
from mechanize import build_opener, HTTPHandler, \
HTTPDefaultErrorHandler, HTTPRedirectHandler, \
HTTPRobotRulesProcessor
class MockHTTPHandler(mechanize.BaseHandler):
def __init__(self):
self.requests = []
def http_open(self, req):
import mimetools
import httplib
import copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if req.get_full_url() == "http://example.com/robots.txt":
hdr = "Location: http://example.com/en/robots.txt\r\n\r\n"
msg = mimetools.Message(StringIO(hdr))
return self.parent.error("http", req,
test_response(), 302, "Blah", msg)
else:
return test_response("Allow: *", [], req.get_full_url())
hh = MockHTTPHandler()
hdeh = HTTPDefaultErrorHandler()
hrh = HTTPRedirectHandler()
rh = HTTPRobotRulesProcessor()
o = build_test_opener(hh, hdeh, hrh, rh)
o.open("http://example.com/")
self.assertEqual([req.get_full_url() for req in hh.requests], [
"http://example.com/robots.txt",
"http://example.com/en/robots.txt",
"http://example.com/",
])
def getFileType(self):
try:
from StringIO import StringIO
except ImportError:
raise SkipTest("StringIO.StringIO is not available.")
else:
return StringIO
def getFileType(self):
try:
from cStringIO import StringIO
except ImportError:
raise SkipTest("cStringIO.StringIO is not available.")
else:
return StringIO
def ch_en(self,response):
if response.info().get('Content-Encoding') == 'gzip' or response.info().get('Content-Encoding') == 'x-gzip':
buf = StringIO(response.read())
f = gzip.GzipFile(fileobj=buf)
data = f.read()
elif response.info().get('Content-Encoding') == 'deflate':
f = StringIO.StringIO(zlib.decompress(response.read()))
data = f.read()
else:
data = response.read()
return data
def get_schema(self):
if self.schema is None:
sio = StringIO.StringIO("""
<schema>
<import package='ZServer'/>
<multisection name='*'
type='ZServer.server'
attribute='servers'/>
</schema>
""")
schema = ZConfig.loadSchemaFile(sio)
BaseTest.schema = schema
return self.schema
def load_factory(self, text):
conf, xxx = ZConfig.loadConfigFile(self.get_schema(),
StringIO.StringIO(text))
self.assertEqual(len(conf.servers), 1)
return conf.servers[0]
def _trap_warning_output(self):
if self._old_stderr is not None:
return
import sys
from StringIO import StringIO
self._old_stderr = sys.stderr
self._our_stderr_stream = sys.stderr = StringIO()
def test_ftp(self):
class MockFTPWrapper:
def __init__(self, data):
self.data = data
def retrfile(self, filename, filetype):
self.filename, self.filetype = filename, filetype
return StringIO.StringIO(self.data), len(self.data)
class NullFTPHandler(mechanize.FTPHandler):
def __init__(self, data):
self.data = data
def connect_ftp(self, user, passwd, host, port, dirs, timeout):
self.user, self.passwd = user, passwd
self.host, self.port = host, port
self.dirs = dirs
self.timeout = timeout
self.ftpwrapper = MockFTPWrapper(self.data)
return self.ftpwrapper
import ftplib
import socket
data = "rheum rhaponicum"
h = NullFTPHandler(data)
o = h.parent = MockOpener()
for url, host, port, type_, dirs, timeout, filename, mimetype in [
("ftp://localhost/foo/bar/baz.html", "localhost", ftplib.FTP_PORT,
"I", ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT,
"baz.html", "text/html"),
("ftp://localhost:80/foo/bar/", "localhost", 80, "D",
["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "", None),
("ftp://localhost/baz.gif;type=a", "localhost", ftplib.FTP_PORT,
"A", [], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "baz.gif",
None), # TODO: really this should guess image/gif
]:
req = Request(url, timeout=timeout)
r = h.ftp_open(req)
# ftp authentication not yet implemented by FTPHandler
self.assertTrue(h.user == h.passwd == "")
self.assertEqual(h.host, socket.gethostbyname(host))
self.assertEqual(h.port, port)
self.assertEqual(h.dirs, dirs)
if sys.version_info >= (2, 6):
self.assertEquals(h.timeout, timeout)
self.assertEqual(h.ftpwrapper.filename, filename)
self.assertEqual(h.ftpwrapper.filetype, type_)
headers = r.info()
self.assertEqual(headers.get("Content-type"), mimetype)
self.assertEqual(int(headers["Content-length"]), len(data))