def generate_info():
tickets_archive_path = ROOT_DIR_PATH.joinpath('tickets.zip')
ensure_data_file(tickets_archive_path, DATA_FILE_INFO['TICKETS_URL'])
with zipfile.ZipFile(str(tickets_archive_path)) as zf:
for name in zf.namelist():
stem, ext = os.path.splitext(name)
if ext != '.csv':
continue
with zf.open(name) as f:
# Zipfile only opens file in binary mode, but csv only accepts
# text files, so we need to wrap this.
# See <https://stackoverflow.com/questions/5627954>.
textfile = io.TextIOWrapper(f, encoding='utf8', newline='')
for row in csv.DictReader(textfile):
yield Registration(row)
python类TextIOWrapper()的实例源码
def run(self, *args, **options):
p_path = os.path.join('/proc', str(os.getppid()), 'cmdline')
with open(p_path, 'rb') as f:
p_cmdline = f.read().split(b'\x00')
p = None
if b'runserver' not in p_cmdline:
self.stdout.write("Starting webpack-dev-server...")
p = webpack_dev_server()
wrapper = io.TextIOWrapper(p.stdout, line_buffering=True)
first_line = next(wrapper)
webpack_host = first_line.split()[-1]
print(webpack_host)
super().run(**options)
if p:
p.kill()
p.wait()
def open_str(self, name: str, encoding='utf8'):
"""Open a file in unicode mode or raise FileNotFoundError.
The return value is a StringIO in-memory buffer.
"""
with self:
# File() calls with the VPK object we need directly.
if isinstance(name, VPKFile):
file = name
else:
try:
file = self._ref[name]
except KeyError:
raise FileNotFoundError(name)
# Wrap the data to treat it as bytes, then
# wrap that to decode and clean up universal newlines.
return io.TextIOWrapper(io.BytesIO(file.read()), encoding)
def popen(cmd, mode="r", buffering=-1):
if not isinstance(cmd, str):
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
if mode not in ("r", "w"):
raise ValueError("invalid mode %r" % mode)
if buffering == 0 or buffering is None:
raise ValueError("popen() does not support unbuffered streams")
import subprocess, io
if mode == "r":
proc = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
else:
proc = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
# Helper for popen() -- a proxy for a file whose close waits for the process
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', help='Config file')
args = parser.parse_args()
if args.config:
with open(args.config) as input:
config = json.load(input)
else:
config = {}
if not config.get('disable_collection', False):
logger.info('Sending version information to stitchdata.com. ' +
'To disable sending anonymous usage data, set ' +
'the config parameter "disable_collection" to true')
threading.Thread(target=send_usage_stats).start()
input = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
state = persist_lines(config.get('delimiter', ','),
config.get('quotechar', '"'),
input)
emit_state(state)
logger.debug("Exiting normally")
def backport_makefile(self, mode="r", buffering=None, encoding=None,
errors=None, newline=None):
"""
Backport of ``socket.makefile`` from Python 3.5.
"""
if not set(mode) <= set(["r", "w", "b"]):
raise ValueError(
"invalid mode %r (only r, w, b allowed)" % (mode,)
)
writing = "w" in mode
reading = "r" in mode or not writing
assert reading or writing
binary = "b" in mode
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
raw = SocketIO(self, rawmode)
self._makefile_refs += 1
if buffering is None:
buffering = -1
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
assert writing
buffer = io.BufferedWriter(raw, buffering)
if binary:
return buffer
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode
return text
def handle_display_options(self, option_order):
"""If there were any non-global "display-only" options
(--help-commands or the metadata display options) on the command
line, display the requested info and return true; else return
false.
"""
import sys
if six.PY2 or self.help_commands:
return _Distribution.handle_display_options(self, option_order)
# Stdout may be StringIO (e.g. in tests)
import io
if not isinstance(sys.stdout, io.TextIOWrapper):
return _Distribution.handle_display_options(self, option_order)
# Don't wrap stdout if utf-8 is already the encoding. Provides
# workaround for #334.
if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
return _Distribution.handle_display_options(self, option_order)
# Print metadata in UTF-8 no matter the platform
encoding = sys.stdout.encoding
errors = sys.stdout.errors
newline = sys.platform != 'win32' and '\n' or None
line_buffering = sys.stdout.line_buffering
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
try:
return _Distribution.handle_display_options(self, option_order)
finally:
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), encoding, errors, newline, line_buffering)
def gettext_popen_wrapper(args, os_err_exc_type=CommandError, stdout_encoding="utf-8"):
"""
Makes sure text obtained from stdout of gettext utilities is Unicode.
"""
# This both decodes utf-8 and cleans line endings. Simply using
# popen_wrapper(universal_newlines=True) doesn't properly handle the
# encoding. This goes back to popen's flaky support for encoding:
# https://bugs.python.org/issue6135. This is a solution for #23271, #21928.
# No need to do anything on Python 2 because it's already a byte-string there.
manual_io_wrapper = six.PY3 and stdout_encoding != DEFAULT_LOCALE_ENCODING
stdout, stderr, status_code = popen_wrapper(args, os_err_exc_type=os_err_exc_type,
universal_newlines=not manual_io_wrapper)
if manual_io_wrapper:
stdout = io.TextIOWrapper(io.BytesIO(stdout), encoding=stdout_encoding).read()
if six.PY2:
stdout = stdout.decode(stdout_encoding)
return stdout, stderr, status_code
def anyfile(infile, mode='r', encoding="utf8"):
'''
return a file handler with the support for gzip/zip comppressed files
if infile is a two value tuple, then first one is the compressed file;
the second one is the actual filename in the compressed file.
e.g., ('a.zip', 'aa.txt')
'''
if isinstance(infile, tuple):
infile, rawfile = infile[:2]
else:
rawfile = os.path.splitext(infile)[0]
filetype = os.path.splitext(infile)[1].lower()
if filetype == '.gz':
import gzip
in_f = io.TextIOWrapper(gzip.GzipFile(infile, 'r'),encoding=encoding)
elif filetype == '.zip':
import zipfile
in_f = io.TextIOWrapper(zipfile.ZipFile(infile, 'r').open(rawfile, 'r'),encoding=encoding)
else:
in_f = open(infile, mode, encoding=encoding)
return in_f
def open_str(self, name: str, encoding='utf8'):
"""Return a string buffer for a 'file'.
This performs universal newlines conversion.
The encoding argument is ignored for files which are
originally text.
"""
# We don't need this, but it should match other filesystems.
self._check_open()
try:
filename, data = self._mapping[self._clean_path(name)]
except KeyError:
raise FileNotFoundError(name)
if isinstance(data, bytes):
# Decode on the fly, with universal newlines.
return io.TextIOWrapper(
io.BytesIO(data),
encoding=encoding,
)
else:
# None = universal newlines mode directly.
# No encoding is needed obviously.
return io.StringIO(data, newline=None)
def backport_makefile(self, mode="r", buffering=None, encoding=None,
errors=None, newline=None):
"""
Backport of ``socket.makefile`` from Python 3.5.
"""
if not set(mode) <= set(["r", "w", "b"]):
raise ValueError(
"invalid mode %r (only r, w, b allowed)" % (mode,)
)
writing = "w" in mode
reading = "r" in mode or not writing
assert reading or writing
binary = "b" in mode
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
raw = SocketIO(self, rawmode)
self._makefile_refs += 1
if buffering is None:
buffering = -1
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
assert writing
buffer = io.BufferedWriter(raw, buffering)
if binary:
return buffer
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode
return text
def popen(cmd, mode="r", buffering=-1):
if not isinstance(cmd, str):
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
if mode not in ("r", "w"):
raise ValueError("invalid mode %r" % mode)
if buffering == 0 or buffering is None:
raise ValueError("popen() does not support unbuffered streams")
import subprocess, io
if mode == "r":
proc = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
else:
proc = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
# Helper for popen() -- a proxy for a file whose close waits for the process
def read_cases(file):
open = try_open_zip(file)
if not open:
file.seek(0)
open = try_open_tar(file)
if not open:
raise FormatError(file, 'not a zip file or tar file')
try:
config = TextIOWrapper(open('config.ini'),
encoding='utf-8', errors='replace')
return read_legacy_cases(config, open)
except FileNotFoundError:
pass
try:
config = open('config.yaml')
return read_yaml_cases(config, open)
except FileNotFoundError:
pass
raise FormatError('config file not found')
def load_source(self):
"""Load the source for the specified file."""
if self.filename in self.STDIN_NAMES:
self.filename = 'stdin'
if sys.version_info[0] < 3:
self.source = sys.stdin.read()
else:
self.source = TextIOWrapper(sys.stdin.buffer,
errors='ignore').read()
else:
# Could be a Python 2.7 StringIO with no context manager, sigh.
# with tokenize_open(self.filename) as fd:
# self.source = fd.read()
handle = tokenize_open(self.filename)
self.source = handle.read()
handle.close()
def __write_merged_imports(self, file: TextIOWrapper, imports: list):
"""
Sorts and write the given imports to the file, adding a blank line between
each group
f -- the file object (needs write permission)
imports -- the list of imports
"""
sorted_imports = self.sort_imports(imports)
previous_group = -1
for imp in sorted_imports:
group = self.get_import_group(imp)
if not (group == previous_group):
if not (previous_group == -1):
file.write("\n")
previous_group = group
file.write(imp)
def source_to_unicode(txt, errors='replace', skip_encoding_cookie=True):
"""Converts a bytes string with python source code to unicode.
Unicode strings are passed through unchanged. Byte strings are checked
for the python source file encoding cookie to determine encoding.
txt can be either a bytes buffer or a string containing the source
code.
"""
if isinstance(txt, str):
return txt
if isinstance(txt, bytes):
buffer = BytesIO(txt)
else:
buffer = txt
try:
encoding, _ = detect_encoding(buffer.readline)
except SyntaxError:
encoding = "ascii"
buffer.seek(0)
text = TextIOWrapper(buffer, encoding, errors=errors, line_buffering=True)
text.mode = 'r'
if skip_encoding_cookie:
return u"".join(strip_encoding_cookie(text))
else:
return text.read()
def popen(cmd, mode="r", buffering=-1):
if not isinstance(cmd, str):
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
if mode not in ("r", "w"):
raise ValueError("invalid mode %r" % mode)
if buffering == 0 or buffering == None:
raise ValueError("popen() does not support unbuffered streams")
import subprocess, io
if mode == "r":
proc = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
else:
proc = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
bufsize=buffering)
return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
# Helper for popen() -- a proxy for a file whose close waits for the process
def test_fileobj_readlines(self):
self.tar.extract("ustar/regtype", TEMPDIR)
tarinfo = self.tar.getmember("ustar/regtype")
with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
lines1 = fobj1.readlines()
fobj = self.tar.extractfile(tarinfo)
try:
fobj2 = io.TextIOWrapper(fobj)
lines2 = fobj2.readlines()
self.assertTrue(lines1 == lines2,
"fileobj.readlines() failed")
self.assertTrue(len(lines2) == 114,
"fileobj.readlines() failed")
self.assertTrue(lines2[83] ==
"I will gladly admit that Python is not the fastest running scripting language.\n",
"fileobj.readlines() failed")
finally:
fobj.close()
def test_add(self):
# Add copies of a sample message
keys = []
keys.append(self._box.add(self._template % 0))
self.assertEqual(len(self._box), 1)
keys.append(self._box.add(mailbox.Message(_sample_message)))
self.assertEqual(len(self._box), 2)
keys.append(self._box.add(email.message_from_string(_sample_message)))
self.assertEqual(len(self._box), 3)
keys.append(self._box.add(io.BytesIO(_bytes_sample_message)))
self.assertEqual(len(self._box), 4)
keys.append(self._box.add(_sample_message))
self.assertEqual(len(self._box), 5)
keys.append(self._box.add(_bytes_sample_message))
self.assertEqual(len(self._box), 6)
with self.assertWarns(DeprecationWarning):
keys.append(self._box.add(
io.TextIOWrapper(io.BytesIO(_bytes_sample_message))))
self.assertEqual(len(self._box), 7)
self.assertEqual(self._box.get_string(keys[0]), self._template % 0)
for i in (1, 2, 3, 4, 5, 6):
self._check_sample(self._box[keys[i]])
def open(self, mode='r', encoding=None):
"""Return file-like object
Args:
mode (str): access mode (only reading modes are supported)
encoding (str): text decoding method for text access (default: system default)
Returns:
io.BytesIO OR io.TextIOWrapper: buffer accessing the file as bytes or characters
"""
access_type = self._get_access_type(mode)
if access_type == 't' and encoding is not None and encoding != self.encoded_with:
warnings.warn('Attempting to decode %s as "%s", but encoding is declared as "%s"'
% (self, encoding, self.encoded_with))
if encoding is None:
encoding = self.encoded_with
buffer = io.BytesIO(self._contents)
if access_type == 'b':
return buffer
else:
return io.TextIOWrapper(buffer, encoding=encoding)
def handle_display_options(self, option_order):
"""If there were any non-global "display-only" options
(--help-commands or the metadata display options) on the command
line, display the requested info and return true; else return
false.
"""
import sys
if six.PY2 or self.help_commands:
return _Distribution.handle_display_options(self, option_order)
# Stdout may be StringIO (e.g. in tests)
import io
if not isinstance(sys.stdout, io.TextIOWrapper):
return _Distribution.handle_display_options(self, option_order)
# Don't wrap stdout if utf-8 is already the encoding. Provides
# workaround for #334.
if sys.stdout.encoding.lower() in ('utf-8', 'utf8'):
return _Distribution.handle_display_options(self, option_order)
# Print metadata in UTF-8 no matter the platform
encoding = sys.stdout.encoding
errors = sys.stdout.errors
newline = sys.platform != 'win32' and '\n' or None
line_buffering = sys.stdout.line_buffering
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), 'utf-8', errors, newline, line_buffering)
try:
return _Distribution.handle_display_options(self, option_order)
finally:
sys.stdout = io.TextIOWrapper(
sys.stdout.detach(), encoding, errors, newline, line_buffering)
def open(filename):
"""Open a file in read only mode using the encoding detected by
detect_encoding().
"""
buffer = _builtin_open(filename, 'rb')
try:
encoding, lines = detect_encoding(buffer.readline)
buffer.seek(0)
text = TextIOWrapper(buffer, encoding, line_buffering=True)
text.mode = 'r'
return text
except:
buffer.close()
raise
def backport_makefile(self, mode="r", buffering=None, encoding=None,
errors=None, newline=None):
"""
Backport of ``socket.makefile`` from Python 3.5.
"""
if not set(mode) <= set(["r", "w", "b"]):
raise ValueError(
"invalid mode %r (only r, w, b allowed)" % (mode,)
)
writing = "w" in mode
reading = "r" in mode or not writing
assert reading or writing
binary = "b" in mode
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
raw = SocketIO(self, rawmode)
self._makefile_refs += 1
if buffering is None:
buffering = -1
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
assert writing
buffer = io.BufferedWriter(raw, buffering)
if binary:
return buffer
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode
return text
def backport_makefile(self, mode="r", buffering=None, encoding=None,
errors=None, newline=None):
"""
Backport of ``socket.makefile`` from Python 3.5.
"""
if not set(mode) <= set(["r", "w", "b"]):
raise ValueError(
"invalid mode %r (only r, w, b allowed)" % (mode,)
)
writing = "w" in mode
reading = "r" in mode or not writing
assert reading or writing
binary = "b" in mode
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
raw = SocketIO(self, rawmode)
self._makefile_refs += 1
if buffering is None:
buffering = -1
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
assert writing
buffer = io.BufferedWriter(raw, buffering)
if binary:
return buffer
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode
return text
def load(self, file):
try:
tree = json.load(io.TextIOWrapper(file, encoding='utf8'))
tree = Audit.SCHEMA.validate(tree)
self.__artifact = Artifact.fromData(tree["artifact"])
self.__references = {
r["artifact-id"] : Artifact.fromData(r) for r in tree["references"]
}
except schema.SchemaError as e:
raise ParseError("Invalid audit record: " + str(e))
except ValueError as e:
raise ParseError("Invalid json: " + str(e))
self.__validate()
def save(self, file):
tree = {
"artifact" : self.__artifact.dump(),
"references" : [ a.dump() for a in self.__references.values() ]
}
try:
with gzip.open(file, 'wb', 6) as gzf:
json.dump(tree, io.TextIOWrapper(gzf, encoding='utf8'))
except OSError as e:
raise BuildError("Cannot write audit: " + str(e))
def make_a_snapshot(file_name, output_name, delay=DEFAULT_DELAY):
file_type = output_name.split('.')[-1]
pixel_ratio = 2
shell_flag = False
if sys.platform == 'win32':
shell_flag = True
__actual_delay_in_ms = int(delay * 1000)
# add shell=True and it works on Windows now.
proc_params = [
PHANTOMJS_EXE,
os.path.join(get_resource_dir('phantomjs'), 'snapshot.js'),
file_name.replace('\\', '/'),
file_type,
str(__actual_delay_in_ms),
str(pixel_ratio)
]
proc = subprocess.Popen(
proc_params, stdout=subprocess.PIPE, shell=shell_flag)
if PY2:
content = proc.stdout.read()
content = content.decode('utf-8')
else:
content = io.TextIOWrapper(proc.stdout, encoding="utf-8").read()
content_array = content.split(',')
if len(content_array) != 2:
raise Exception("No snapshot taken by phantomjs. " +
"Please make sure it is installed " +
"and available on your path")
base64_imagedata = content_array[1]
imagedata = decode_base64(base64_imagedata.encode('utf-8'))
if file_type in ['pdf', 'gif']:
save_as(imagedata, output_name, file_type)
elif file_type in ['png', 'jpeg']:
save_as_png(imagedata, output_name)
else:
raise Exception(NOT_SUPPORTED_FILE_TYPE % file_type)
def backport_makefile(self, mode="r", buffering=None, encoding=None,
errors=None, newline=None):
"""
Backport of ``socket.makefile`` from Python 3.5.
"""
if not set(mode) <= set(["r", "w", "b"]):
raise ValueError(
"invalid mode %r (only r, w, b allowed)" % (mode,)
)
writing = "w" in mode
reading = "r" in mode or not writing
assert reading or writing
binary = "b" in mode
rawmode = ""
if reading:
rawmode += "r"
if writing:
rawmode += "w"
raw = SocketIO(self, rawmode)
self._makefile_refs += 1
if buffering is None:
buffering = -1
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
if buffering == 0:
if not binary:
raise ValueError("unbuffered streams must be binary")
return raw
if reading and writing:
buffer = io.BufferedRWPair(raw, raw, buffering)
elif reading:
buffer = io.BufferedReader(raw, buffering)
else:
assert writing
buffer = io.BufferedWriter(raw, buffering)
if binary:
return buffer
text = io.TextIOWrapper(buffer, encoding, errors, newline)
text.mode = mode
return text
def _wrap_reader_for_text(fp, encoding):
if isinstance(fp.read(0), bytes):
fp = io.TextIOWrapper(io.BufferedReader(fp), encoding)
return fp
def _wrap_writer_for_text(fp, encoding):
try:
fp.write('')
except TypeError:
fp = io.TextIOWrapper(fp, encoding)
return fp