def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
python类StringIO()的实例源码
def render(file_name):
"""
This function makes use of slsutil salt module to render sls files
Args:
file_name (str): the sls file path
"""
err = StringIO.StringIO()
out = StringIO.StringIO()
exception = None
with redirect_stderr(err):
with redirect_stdout(out):
try:
result = SLSRenderer.caller.cmd('slsutil.renderer', file_name)
except salt.exceptions.SaltException as ex:
exception = StageRenderingException(file_name, ex.strerror)
if exception:
# pylint: disable=E0702
raise exception
logger.info("Rendered SLS file %s, stdout\n%s", file_name, out.getvalue())
logger.debug("Rendered SLS file %s, stderr\n%s", file_name, err.getvalue())
return result, out.getvalue(), err.getvalue()
def topngbytes(name, rows, x, y, **k):
"""Convenience function for creating a PNG file "in memory" as a
string. Creates a :class:`Writer` instance using the keyword arguments,
then passes `rows` to its :meth:`Writer.write` method. The resulting
PNG file is returned as a string. `name` is used to identify the file for
debugging.
"""
import os
print (name)
f = BytesIO()
w = Writer(x, y, **k)
w.write(f, rows)
if os.environ.get('PYPNG_TEST_TMP'):
w = open(name, 'wb')
w.write(f.getvalue())
w.close()
return f.getvalue()
def testPtrns(self):
"Test colour type 3 and tRNS chunk (and 4-bit palette)."
a = (50,99,50,50)
b = (200,120,120,80)
c = (255,255,255)
d = (200,120,120)
e = (50,99,50)
w = Writer(3, 3, bitdepth=4, palette=[a,b,c,d,e])
f = BytesIO()
w.write_array(f, array('B', (4, 3, 2, 3, 2, 0, 2, 0, 1)))
r = Reader(bytes=f.getvalue())
x,y,pixels,meta = r.asRGBA8()
self.assertEqual(x, 3)
self.assertEqual(y, 3)
c = c+(255,)
d = d+(255,)
e = e+(255,)
boxed = [(e,d,c),(d,c,a),(c,a,b)]
flat = map(lambda row: itertools.chain(*row), boxed)
self.assertEqual(map(list, pixels), map(list, flat))
def testPAMin(self):
"""Test that the command line tool can read PAM file."""
def do():
return _main(['testPAMin'])
s = BytesIO()
s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n'
'TUPLTYPE RGB_ALPHA\nENDHDR\n'))
# The pixels in flat row flat pixel format
flat = [255,0,0,255, 0,255,0,120, 0,0,255,30]
asbytes = seqtobytes(flat)
s.write(asbytes)
s.flush()
s.seek(0)
o = BytesIO()
testWithIO(s, o, do)
r = Reader(bytes=o.getvalue())
x,y,pixels,meta = r.read()
self.assertTrue(r.alpha)
self.assertTrue(not r.greyscale)
self.assertEqual(list(itertools.chain(*pixels)), flat)
def fetch_quote(symbols, timestamp, cached_file=None):
url = URL % '+'.join(symbols)
if not cached_file:
# fetch
log('Fetching %s' % url)
fp = urllib.urlopen(url)
try:
data = fp.read()
finally:
fp.close()
# log result
if LOG_DATA_FETCHED:
log_filename = LOG_FILENAME % timestamp.replace(':','-')
out = open(log_filename, 'wb')
try:
log('Fetched %s bytes logged in %s' % (len(data), log_filename))
out.write(data)
finally:
out.close()
else:
data = open(cached_file,'rb').read()
return StringIO(data)
def request(self, endpoint, post=None):
buffer = BytesIO()
ch = pycurl.Curl()
ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
ch.setopt(pycurl.USERAGENT, self.userAgent)
ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
ch.setopt(pycurl.FOLLOWLOCATION, True)
ch.setopt(pycurl.HEADER, True)
ch.setopt(pycurl.VERBOSE, False)
ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
if post is not None:
ch.setopt(pycurl.POST, True)
ch.setopt(pycurl.POSTFIELDS, post)
if self.proxy:
ch.setopt(pycurl.PROXY, self.proxyHost)
if self.proxyAuth:
ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)
ch.perform()
resp = buffer.getvalue()
header_len = ch.getinfo(pycurl.HEADER_SIZE)
header = resp[0: header_len]
body = resp[header_len:]
ch.close()
if self.debug:
print("REQUEST: " + endpoint)
if post is not None:
if not isinstance(post, list):
print("DATA: " + str(post))
print("RESPONSE: " + body)
return [header, json_decode(body)]
def run_test(innerHTML, input, expected, errors, treeClass):
try:
p = html5parser.HTMLParser(tree = treeClass["builder"])
if innerHTML:
document = p.parseFragment(StringIO.StringIO(input), innerHTML)
else:
document = p.parse(StringIO.StringIO(input))
except constants.DataLossWarning:
#Ignore testcases we know we don't pass
return
document = treeClass.get("adapter", lambda x: x)(document)
try:
output = convertTokens(treeClass["walker"](document))
output = attrlist.sub(sortattrs, output)
expected = attrlist.sub(sortattrs, convertExpected(expected))
assert expected == output, "\n".join([
"", "Input:", input,
"", "Expected:", expected,
"", "Received:", output
])
except NotImplementedError:
pass # Amnesty for those that confess...
def loadRecord(line):
"""
????csv??
"""
input_line=StringIO.StringIO(line)
#row=unicodecsv.reader(input_line, encoding="utf-8")
#return row.next()
#reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"])
reader=csv.reader(input_line)
return reader.next()
#data=[]
#for row in reader:
# print row
# data.append([unicode(cell,"utf-8") for cell in row])
#return data[0]
#return reader.next()
#raw_data=sc.textFile(train_file_path).map(loadRecord)
#print raw_data.take(10)
def feed(self, markup):
if isinstance(markup, bytes):
markup = BytesIO(markup)
elif isinstance(markup, unicode):
markup = StringIO(markup)
# Call feed() at least once, even if the markup is empty,
# or the parser won't be initialized.
data = markup.read(self.CHUNK_SIZE)
try:
self.parser = self.parser_for(self.soup.original_encoding)
self.parser.feed(data)
while len(data) != 0:
# Now call feed() on the rest of the data, chunk by chunk.
data = markup.read(self.CHUNK_SIZE)
if len(data) != 0:
self.parser.feed(data)
self.parser.close()
except (UnicodeDecodeError, LookupError, etree.ParserError), e:
raise ParserRejectedMarkup(str(e))
def CurlPOST(url, data, cookie):
c = pycurl.Curl()
b = StringIO.StringIO()
c.setopt(pycurl.URL, url)
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
# c.setopt(pycurl.TIMEOUT, 10)
c.setopt(pycurl.WRITEFUNCTION, b.write)
c.setopt(pycurl.COOKIEFILE, cookie)
c.setopt(pycurl.COOKIEJAR, cookie)
c.setopt(pycurl.POSTFIELDS, data)
c.perform()
html = b.getvalue()
b.close()
c.close()
return html
def test_pop_zipfile():
sio = StringIO()
zf = wheel.install.VerifyingZipFile(sio, 'w')
zf.writestr("one", b"first file")
zf.writestr("two", b"second file")
zf.close()
try:
zf.pop()
except RuntimeError:
pass # already closed
else:
raise Exception("expected RuntimeError")
zf = wheel.install.VerifyingZipFile(sio, 'a')
zf.pop()
zf.close()
zf = wheel.install.VerifyingZipFile(sio, 'r')
assert len(zf.infolist()) == 1
def _decompressContent(response, new_content):
content = new_content
try:
encoding = response.get('content-encoding', None)
if encoding in ['gzip', 'deflate']:
if encoding == 'gzip':
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content))
# Record the historical presence of the encoding in a way the won't interfere.
response['-content-encoding'] = response['content-encoding']
del response['content-encoding']
except IOError:
content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
return content
def __init__(self, body, mimetype='application/octet-stream',
chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaInMemoryUpload.
DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
the stream.
Args:
body: string, Bytes of body content.
mimetype: string, Mime-type of the file or default of
'application/octet-stream'.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
fd = StringIO.StringIO(body)
super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
resumable=resumable)
def emit(events, stream=None, Dumper=Dumper,
canonical=None, indent=None, width=None,
allow_unicode=None, line_break=None):
"""
Emit YAML parsing events into a stream.
If stream is None, return the produced string instead.
"""
getvalue = None
if stream is None:
from StringIO import StringIO
stream = StringIO()
getvalue = stream.getvalue
dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
allow_unicode=allow_unicode, line_break=line_break)
try:
for event in events:
dumper.emit(event)
finally:
dumper.dispose()
if getvalue:
return getvalue()
def test_pop_zipfile():
sio = StringIO()
zf = wheel.install.VerifyingZipFile(sio, 'w')
zf.writestr("one", b"first file")
zf.writestr("two", b"second file")
zf.close()
try:
zf.pop()
except RuntimeError:
pass # already closed
else:
raise Exception("expected RuntimeError")
zf = wheel.install.VerifyingZipFile(sio, 'a')
zf.pop()
zf.close()
zf = wheel.install.VerifyingZipFile(sio, 'r')
assert len(zf.infolist()) == 1
def test_pop_zipfile():
sio = StringIO()
zf = wheel.install.VerifyingZipFile(sio, 'w')
zf.writestr("one", b"first file")
zf.writestr("two", b"second file")
zf.close()
try:
zf.pop()
except RuntimeError:
pass # already closed
else:
raise Exception("expected RuntimeError")
zf = wheel.install.VerifyingZipFile(sio, 'a')
zf.pop()
zf.close()
zf = wheel.install.VerifyingZipFile(sio, 'r')
assert len(zf.infolist()) == 1
def test_pop_zipfile():
sio = StringIO()
zf = wheel.install.VerifyingZipFile(sio, 'w')
zf.writestr("one", b"first file")
zf.writestr("two", b"second file")
zf.close()
try:
zf.pop()
except RuntimeError:
pass # already closed
else:
raise Exception("expected RuntimeError")
zf = wheel.install.VerifyingZipFile(sio, 'a')
zf.pop()
zf.close()
zf = wheel.install.VerifyingZipFile(sio, 'r')
assert len(zf.infolist()) == 1
def test_object_pairs_hook(self):
s = '{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}'
p = [("xkd", 1), ("kcw", 2), ("art", 3), ("hxm", 4),
("qrt", 5), ("pad", 6), ("hoy", 7)]
self.assertEqual(self.loads(s), eval(s))
self.assertEqual(self.loads(s, object_pairs_hook=lambda x: x), p)
self.assertEqual(self.json.load(StringIO(s),
object_pairs_hook=lambda x: x), p)
od = self.loads(s, object_pairs_hook=OrderedDict)
self.assertEqual(od, OrderedDict(p))
self.assertEqual(type(od), OrderedDict)
# the object_pairs_hook takes priority over the object_hook
self.assertEqual(self.loads(s, object_pairs_hook=OrderedDict,
object_hook=lambda x: None),
OrderedDict(p))
# check that empty object literals work (see #17368)
self.assertEqual(self.loads('{}', object_pairs_hook=OrderedDict),
OrderedDict())
self.assertEqual(self.loads('{"empty": {}}',
object_pairs_hook=OrderedDict),
OrderedDict([('empty', OrderedDict())]))