def test_readline(self):
with open(TESTFN, 'wb') as f:
f.write(b'A\nB\r\nC\r')
# Fill TextIOWrapper buffer.
f.write(b'123456789\n' * 1000)
# Issue #20501: readline() shouldn't read whole file.
f.write(b'\x80')
self.addCleanup(safe_unlink, TESTFN)
with FileInput(files=TESTFN,
openhook=hook_encoded('ascii'), bufsize=8) as fi:
try:
self.assertEqual(fi.readline(), 'A\n')
self.assertEqual(fi.readline(), 'B\n')
self.assertEqual(fi.readline(), 'C\n')
except UnicodeDecodeError:
self.fail('Read to end of file')
with self.assertRaises(UnicodeDecodeError):
# Read to the end of file.
list(fi)
self.assertEqual(fi.readline(), '')
self.assertEqual(fi.readline(), '')
python类FileInput()的实例源码
def test_nextfile_oserror_deleting_backup(self):
"""Tests invoking FileInput.nextfile() when the attempt to delete
the backup file would raise OSError. This error is expected to be
silently ignored"""
os_unlink_orig = os.unlink
os_unlink_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi:
next(fi) # make sure the file is opened
os.unlink = os_unlink_replacement
fi.nextfile()
finally:
os.unlink = os_unlink_orig
# sanity check to make sure that our test scenario was actually hit
self.assertTrue(os_unlink_replacement.invoked,
"os.unlink() was not invoked")
def test_readline_os_fstat_raises_OSError(self):
"""Tests invoking FileInput.readline() when os.fstat() raises OSError.
This exception should be silently discarded."""
os_fstat_orig = os.fstat
os_fstat_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi:
os.fstat = os_fstat_replacement
fi.readline()
finally:
os.fstat = os_fstat_orig
# sanity check to make sure that our test scenario was actually hit
self.assertTrue(os_fstat_replacement.invoked,
"os.fstat() was not invoked")
def test_readline_os_chmod_raises_OSError(self):
"""Tests invoking FileInput.readline() when os.chmod() raises OSError.
This exception should be silently discarded."""
os_chmod_orig = os.chmod
os_chmod_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi:
os.chmod = os_chmod_replacement
fi.readline()
finally:
os.chmod = os_chmod_orig
# sanity check to make sure that our test scenario was actually hit
self.assertTrue(os_chmod_replacement.invoked,
"os.fstat() was not invoked")
def test_zero_byte_files(self):
try:
t1 = writeTmp(1, [""])
t2 = writeTmp(2, [""])
t3 = writeTmp(3, ["The only line there is.\n"])
t4 = writeTmp(4, [""])
fi = FileInput(files=(t1, t2, t3, t4))
line = fi.readline()
self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3)
line = fi.readline()
self.assertFalse(line)
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4)
fi.close()
finally:
remove_tempfiles(t1, t2, t3, t4)
def ensure_file_exists(filename):
"""Ensure file exists and is not empty, otherwise raise an IOError.
:type filename: string
:param filename: file to check"""
if not os.path.exists(filename):
raise IOError("File %s doesn't exist or not correctly created"
% filename)
if not (os.path.getsize(filename) > 0):
raise IOError("File %s empty" % filename)
(shortname, extension) = os.path.splitext(filename)
if sys.platform == 'win32' and extension=='.seg':
import fileinput
for line in fileinput.FileInput(filename,inplace=0):
line = line.replace("\\\\","/")
def download_with_info_file(self, info_filename):
with contextlib.closing(fileinput.FileInput(
[info_filename], mode='r',
openhook=fileinput.hook_encoded('utf-8'))) as f:
# FileInput doesn't have a read method, we can't call json.load
info = self.filter_requested_info(json.loads('\n'.join(f)))
try:
self.process_ie_result(info, download=True)
except DownloadError:
webpage_url = info.get('webpage_url')
if webpage_url is not None:
self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
return self.download([webpage_url])
else:
raise
return self._download_retcode
def test_zero_byte_files(self):
t1 = t2 = t3 = t4 = None
try:
t1 = writeTmp(1, [""])
t2 = writeTmp(2, [""])
t3 = writeTmp(3, ["The only line there is.\n"])
t4 = writeTmp(4, [""])
fi = FileInput(files=(t1, t2, t3, t4))
line = fi.readline()
self.assertEqual(line, 'The only line there is.\n')
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 1)
self.assertEqual(fi.filename(), t3)
line = fi.readline()
self.assertFalse(line)
self.assertEqual(fi.lineno(), 1)
self.assertEqual(fi.filelineno(), 0)
self.assertEqual(fi.filename(), t4)
fi.close()
finally:
remove_tempfiles(t1, t2, t3, t4)
def test_opening_mode(self):
try:
# invalid mode, should raise ValueError
fi = FileInput(mode="w")
self.fail("FileInput should reject invalid mode argument")
except ValueError:
pass
t1 = None
try:
# try opening in universal newline mode
t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
with check_warnings(('', DeprecationWarning)):
fi = FileInput(files=t1, mode="U")
with check_warnings(('', DeprecationWarning)):
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
finally:
remove_tempfiles(t1)
def test_readline(self):
with open(TESTFN, 'wb') as f:
f.write(b'A\nB\r\nC\r')
# Fill TextIOWrapper buffer.
f.write(b'123456789\n' * 1000)
# Issue #20501: readline() shouldn't read whole file.
f.write(b'\x80')
self.addCleanup(safe_unlink, TESTFN)
with FileInput(files=TESTFN,
openhook=hook_encoded('ascii'), bufsize=8) as fi:
try:
self.assertEqual(fi.readline(), 'A\n')
self.assertEqual(fi.readline(), 'B\n')
self.assertEqual(fi.readline(), 'C\n')
except UnicodeDecodeError:
self.fail('Read to end of file')
with self.assertRaises(UnicodeDecodeError):
# Read to the end of file.
list(fi)
def test_nextfile_oserror_deleting_backup(self):
"""Tests invoking FileInput.nextfile() when the attempt to delete
the backup file would raise OSError. This error is expected to be
silently ignored"""
os_unlink_orig = os.unlink
os_unlink_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi:
next(fi) # make sure the file is opened
os.unlink = os_unlink_replacement
fi.nextfile()
finally:
os.unlink = os_unlink_orig
# sanity check to make sure that our test scenario was actually hit
self.assertTrue(os_unlink_replacement.invoked,
"os.unlink() was not invoked")
def test_readline_os_fstat_raises_OSError(self):
"""Tests invoking FileInput.readline() when os.fstat() raises OSError.
This exception should be silently discarded."""
os_fstat_orig = os.fstat
os_fstat_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi:
os.fstat = os_fstat_replacement
fi.readline()
finally:
os.fstat = os_fstat_orig
# sanity check to make sure that our test scenario was actually hit
self.assertTrue(os_fstat_replacement.invoked,
"os.fstat() was not invoked")
def test_readline_os_chmod_raises_OSError(self):
"""Tests invoking FileInput.readline() when os.chmod() raises OSError.
This exception should be silently discarded."""
os_chmod_orig = os.chmod
os_chmod_replacement = UnconditionallyRaise(OSError)
try:
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t], inplace=True) as fi:
os.chmod = os_chmod_replacement
fi.readline()
finally:
os.chmod = os_chmod_orig
# sanity check to make sure that our test scenario was actually hit
self.assertTrue(os_chmod_replacement.invoked,
"os.fstat() was not invoked")
def test_fileno_when_ValueError_raised(self):
class FilenoRaisesValueError(UnconditionallyRaise):
def __init__(self):
UnconditionallyRaise.__init__(self, ValueError)
def fileno(self):
self.__call__()
unconditionally_raise_ValueError = FilenoRaisesValueError()
t = writeTmp(1, ["\n"])
self.addCleanup(remove_tempfiles, t)
with FileInput(files=[t]) as fi:
file_backup = fi._file
try:
fi._file = unconditionally_raise_ValueError
result = fi.fileno()
finally:
fi._file = file_backup # make sure the file gets cleaned up
# sanity check to make sure that our test scenario was actually hit
self.assertTrue(unconditionally_raise_ValueError.invoked,
"_file.fileno() was not invoked")
self.assertEqual(result, -1, "fileno() should return -1")
def test_modes(self):
with open(TESTFN, 'wb') as f:
# UTF-7 is a convenient, seldom used encoding
f.write(b'A\nB\r\nC\rD+IKw-')
self.addCleanup(safe_unlink, TESTFN)
def check(mode, expected_lines):
with FileInput(files=TESTFN, mode=mode,
openhook=hook_encoded('utf-7')) as fi:
lines = list(fi)
self.assertEqual(lines, expected_lines)
check('r', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
with self.assertWarns(DeprecationWarning):
check('rU', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
with self.assertWarns(DeprecationWarning):
check('U', ['A\n', 'B\n', 'C\n', 'D\u20ac'])
with self.assertRaises(ValueError):
check('rb', ['A\n', 'B\r\n', 'C\r', 'D\u20ac'])
def download_with_info_file(self, info_filename):
with contextlib.closing(fileinput.FileInput(
[info_filename], mode='r',
openhook=fileinput.hook_encoded('utf-8'))) as f:
# FileInput doesn't have a read method, we can't call json.load
info = self.filter_requested_info(json.loads('\n'.join(f)))
try:
self.process_ie_result(info, download=True)
except DownloadError:
webpage_url = info.get('webpage_url')
if webpage_url is not None:
self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
return self.download([webpage_url])
else:
raise
return self._download_retcode
def __upgrade_replace_text__(path, text, replace):
with fileinput.FileInput(path, inplace=True, backup=".bak") as _file:
for line in _file:
print(line.replace(text, replace), end='')
def calibration_correction(measurement, channel, energy):
"""
calibration_correction implements a corrected energy calibration based
on the array of channels and energies given as input. It performs a
least squares regression fit of the channels and energies and implements
the new energy calibration in a newly generated .Spe file. The new
spectra file will contain the same information with only the old
calibration changed.
"""
cal_file = sh.copyfile(measurement, os.path.splitext(measurement)[0] +
'_recal.Spe')
fix_measurement = SPEFile.SPEFile(cal_file)
fix_measurement.read()
old_cal = (str(float(fix_measurement.energy_cal[0])) + ' ' +
str(float(fix_measurement.energy_cal[1])))
a_matrix = np.vstack([channel, np.ones(len(channel))]).T
calibration_line = np.linalg.lstsq(a_matrix, energy)
e0 = float(calibration_line[0][1])
eslope = float(calibration_line[0][0])
new_cal = str(float(e0)) + ' ' + str(float(eslope))
with fileinput.FileInput(cal_file, inplace=1) as file:
for line in file:
print(line.replace(old_cal, new_cal).rstrip())
return(cal_file)
def complement( reader, lens ):
# Handle any ValueError, IndexError and OverflowError exceptions that may be thrown when
# the bitsets are being created by skipping the problem lines
complement_reader = BitsetSafeReaderWrapper( reader, lens=lens )
bitsets = complement_reader.binned_bitsets( upstream_pad=0, downstream_pad=0, lens=lens )
# NOT them all
for key, value in bitsets.items():
value.invert()
# Read remaining intervals and subtract
for chrom in bitsets:
bitset = bitsets[chrom]
out_intervals = bits_set_in_range( bitset, 0, lens.get( chrom, MAX ) )
try:
# Write the intervals
for start, end in out_intervals:
fields = ["." for x in range(max(complement_reader.chrom_col, complement_reader.start_col, complement_reader.end_col)+1)]
# default the column to a + if it exists
if complement_reader.strand_col < len( fields ) and complement_reader.strand_col >= 0:
fields[complement_reader.strand_col] = "+"
fields[complement_reader.chrom_col] = chrom
fields[complement_reader.start_col] = start
fields[complement_reader.end_col] = end
new_interval = GenomicInterval(complement_reader, fields, complement_reader.chrom_col, complement_reader.start_col, complement_reader.end_col, complement_reader.strand_col, "+")
yield new_interval
except IndexError as e:
complement_reader.skipped += 1
# no reason to stuff an entire bad file into memmory
if complement_reader.skipped < 10:
complement_reader.skipped_lines.append( ( complement_reader.linenum, complement_reader.current_line, str( e ) ) )
continue
# def main():
# # test it all out
# f1 = fileinput.FileInput("dataset_7.dat")
# g1 = GenomicIntervalReader(f1)
# for interval in complement(g1,{"chr":16000000}):
# print "\t".join(interval)
#
# if __name__ == "__main__":
# main()
def test_files_that_dont_end_with_newline(self):
t1 = t2 = None
try:
t1 = writeTmp(1, ["A\nB\nC"])
t2 = writeTmp(2, ["D\nE\nF"])
fi = FileInput(files=(t1, t2))
lines = list(fi)
self.assertEqual(lines, ["A\n", "B\n", "C", "D\n", "E\n", "F"])
self.assertEqual(fi.filelineno(), 3)
self.assertEqual(fi.lineno(), 6)
finally:
remove_tempfiles(t1, t2)
## def test_unicode_filenames(self):
## # XXX A unicode string is always returned by writeTmp.
## # So is this needed?
## try:
## t1 = writeTmp(1, ["A\nB"])
## encoding = sys.getfilesystemencoding()
## if encoding is None:
## encoding = 'ascii'
## fi = FileInput(files=str(t1, encoding))
## lines = list(fi)
## self.assertEqual(lines, ["A\n", "B"])
## finally:
## remove_tempfiles(t1)