python类FileInput()的实例源码

YoutubeDL.py 文件源码 项目:Qyoutube-dl 作者: lzambella 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
YoutubeDL.py 文件源码 项目:youtube_downloader 作者: aksinghdce 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
cli.py 文件源码 项目:sbds 作者: steemit 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def load_blocks_from_checkpoints(checkpoints_path, start, end):
    """Load blocks from checkpoints"""

    checkpoint_set = sbds.checkpoints.required_checkpoints_for_range(
        path=checkpoints_path, start=start, end=end)
    total_blocks_to_load = end - start

    with fileinput.FileInput(
            mode='r',
            files=checkpoint_set.checkpoint_paths,
            openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks:

        blocks = toolz.itertoolz.drop(checkpoint_set.initial_checkpoint_offset,
                                      blocks)
        if total_blocks_to_load > 0:
            with click.open_file('-', 'w', encoding='utf8') as f:
                for i, block in enumerate(blocks, 1):
                    click.echo(block.strip().encode('utf8'), file=f)
                    if i == total_blocks_to_load:
                        break
        else:
            with click.open_file('-', 'w', encoding='utf8') as f:
                for block in blocks:
                    click.echo(block.strip().encode('utf8'), file=f)
cli.py 文件源码 项目:sbds 作者: steemit 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_checkpoint_access(ctx, checkpoints_path):
    """Test checkpoints access"""
    try:
        checkpoint_set = sbds.checkpoints.required_checkpoints_for_range(
            path=checkpoints_path, start=1, end=0)
        with fileinput.FileInput(
                mode='r',
                files=checkpoint_set.checkpoint_paths,
                openhook=checkpoint_opener_wrapper(encoding='utf8')) as blocks:

            for block in blocks:
                block_num = json.loads(block)['block_num']
                if block_num:
                    click.echo(
                        'Success: loaded block %s' % block_num, err=True)
                    ctx.exit(code=0)
                else:
                    click.echo('Failed to load block', err=True)
                    ctx.exit(code=127)
    except Exception as e:
        click.echo('Fail: %s' % e, err=True)
        ctx.exit(code=127)
YoutubeDL.py 文件源码 项目:optimalvibes 作者: littlemika 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
YoutubeDL.py 文件源码 项目:tvalacarta 作者: tvalacarta 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
find_clusters.py 文件源码 项目:bx-python 作者: bxlab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def getlines( self ):
        if self.left:
            for line in self.left.getlines():
                yield line
        if len(self.lines) >= minregions:
            for line in self.lines:
                yield line
        if self.right:
            for line in self.right.getlines():
                yield line

## def main():
##     f1 = fileinput.FileInput("big.bed")
##     g1 = GenomicIntervalReader(f1)
##     returntree, extra = find_clusters(g1, mincols=50)
##     print "All found"
##     for chrom, value in returntree.items():
##         for start, end in value.getregions():
##             print chrom+"\t"+str(start)+"\t"+str(end)
##         for line in value.getlines():
##             print "Line:\t"+str(line)

## main()
test_fileinput.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_zero_byte_files(self):
        t1 = t2 = t3 = t4 = None
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
test_fileinput.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_opening_mode(self):
        try:
            # invalid mode, should raise ValueError
            fi = FileInput(mode="w")
            self.fail("FileInput should reject invalid mode argument")
        except ValueError:
            pass
        t1 = None
        try:
            # try opening in universal newline mode
            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
            fi = FileInput(files=t1, mode="U")
            lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
        finally:
            remove_tempfiles(t1)
test_fileinput.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_file_opening_hook(self):
        try:
            # cannot use openhook and inplace mode
            fi = FileInput(inplace=1, openhook=lambda f, m: None)
            self.fail("FileInput should raise if both inplace "
                             "and openhook arguments are given")
        except ValueError:
            pass
        try:
            fi = FileInput(openhook=1)
            self.fail("FileInput should check openhook for being callable")
        except ValueError:
            pass
        # XXX The rot13 codec was removed.
        #     So this test needs to be changed to use something else.
        #     (Or perhaps the API needs to change so we can just pass
        #     an encoding rather than using a hook?)
##         try:
##             t1 = writeTmp(1, ["A\nB"], mode="wb")
##             fi = FileInput(files=t1, openhook=hook_encoded("rot13"))
##             lines = list(fi)
##             self.assertEqual(lines, ["N\n", "O"])
##         finally:
##             remove_tempfiles(t1)
test_fileinput.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_zero_byte_files(self):
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
test_fileinput.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'))
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
test_fileinput.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
test_fileinput.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_zero_byte_files(self):
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
test_fileinput.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'))
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
test_fileinput.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
YoutubeDL.py 文件源码 项目:kodi-plugin.video.ted-talks-chinese 作者: daineseh 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
YoutubeDL.py 文件源码 项目:GUIYoutube 作者: coltking 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
test_fileinput.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_zero_byte_files(self):
        t1 = t2 = t3 = t4 = None
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
test_fileinput.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_opening_mode(self):
        try:
            # invalid mode, should raise ValueError
            fi = FileInput(mode="w")
            self.fail("FileInput should reject invalid mode argument")
        except ValueError:
            pass
        t1 = None
        try:
            # try opening in universal newline mode
            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
            fi = FileInput(files=t1, mode="U")
            lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
        finally:
            remove_tempfiles(t1)
test_fileinput.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_readline_os_fstat_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.fstat() raises OSError.
           This exception should be silently discarded."""

        os_fstat_orig = os.fstat
        os_fstat_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.fstat = os_fstat_replacement
                fi.readline()
        finally:
            os.fstat = os_fstat_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_fstat_replacement.invoked,
                        "os.fstat() was not invoked")
test_fileinput.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_readline_os_chmod_raises_OSError(self):
        """Tests invoking FileInput.readline() when os.chmod() raises OSError.
           This exception should be silently discarded."""

        os_chmod_orig = os.chmod
        os_chmod_replacement = UnconditionallyRaise(OSError)
        try:
            t = writeTmp(1, ["\n"])
            self.addCleanup(remove_tempfiles, t)
            with FileInput(files=[t], inplace=True) as fi:
                os.chmod = os_chmod_replacement
                fi.readline()
        finally:
            os.chmod = os_chmod_orig

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(os_chmod_replacement.invoked,
                        "os.fstat() was not invoked")
test_fileinput.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_fileno_when_ValueError_raised(self):
        class FilenoRaisesValueError(UnconditionallyRaise):
            def __init__(self):
                UnconditionallyRaise.__init__(self, ValueError)
            def fileno(self):
                self.__call__()

        unconditionally_raise_ValueError = FilenoRaisesValueError()
        t = writeTmp(1, ["\n"])
        self.addCleanup(remove_tempfiles, t)
        with FileInput(files=[t]) as fi:
            file_backup = fi._file
            try:
                fi._file = unconditionally_raise_ValueError
                result = fi.fileno()
            finally:
                fi._file = file_backup # make sure the file gets cleaned up

        # sanity check to make sure that our test scenario was actually hit
        self.assertTrue(unconditionally_raise_ValueError.invoked,
                        "_file.fileno() was not invoked")

        self.assertEqual(result, -1, "fileno() should return -1")
YoutubeDL.py 文件源码 项目:Yv-dl 作者: r0oth3x49 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def download_with_info_file(self, info_filename):
        with contextlib.closing(fileinput.FileInput(
                [info_filename], mode='r',
                openhook=fileinput.hook_encoded('utf-8'))) as f:
            # FileInput doesn't have a read method, we can't call json.load
            info = self.filter_requested_info(json.loads('\n'.join(f)))
        try:
            self.process_ie_result(info, download=True)
        except DownloadError:
            webpage_url = info.get('webpage_url')
            if webpage_url is not None:
                self.report_warning('The info failed to download, trying with "%s"' % webpage_url)
                return self.download([webpage_url])
            else:
                raise
        return self._download_retcode
indexer.py 文件源码 项目:tino-thesis 作者: Tino92 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def read_and_clean_files(clueweb_file, ann_file, data_dir, ann_dir):
    """
    Read file from data_dir and ann_dir, replace entity mentions and clean records in that file
    :param clueweb_file:
    :param ann_file:
    :param data_dir: Warc files directory
    :param ann_dir: Annotations directory
    :return: {'record_id': record_id,
        'replaced_record': cleaned_replaced_record,
        'cleaned_record': cleaned_record}
    """
    annotation_input = fileinput.FileInput(os.path.join(ann_dir, ann_file), openhook=fileinput.hook_compressed)
    annotation_list = []
    for line in annotation_input:
    annotation_list.append(Annotation.parse_annotation(line))

    warc_path = os.path.join(data_dir, clueweb_file)
    warc_file = warc.open(warc_path)
    print "Replacing entity mentions for ", clueweb_file, ":", ann_file, "..."
    start = time.time()
    warc_entry = WarcEntry(warc_path, warc_file, annotation_list)
    cleaned_records = warc_entry.replace_entity_mentions()
    end = time.time()
    print "Time used: ", end - start
    warc_file.close()
    return cleaned_records
test_fileinput.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_zero_byte_files(self):
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
test_fileinput.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_readline(self):
        with open(TESTFN, 'wb') as f:
            f.write('A\nB\r\nC\r')
            # Fill TextIOWrapper buffer.
            f.write('123456789\n' * 1000)
            # Issue #20501: readline() shouldn't read whole file.
            f.write('\x80')
        self.addCleanup(safe_unlink, TESTFN)

        fi = FileInput(files=TESTFN, openhook=hook_encoded('ascii'), bufsize=8)
        # The most likely failure is a UnicodeDecodeError due to the entire
        # file being read when it shouldn't have been.
        self.assertEqual(fi.readline(), u'A\n')
        self.assertEqual(fi.readline(), u'B\r\n')
        self.assertEqual(fi.readline(), u'C\r')
        with self.assertRaises(UnicodeDecodeError):
            # Read to the end of file.
            list(fi)
        fi.close()
test_fileinput.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_modes(self):
        with open(TESTFN, 'wb') as f:
            # UTF-7 is a convenient, seldom used encoding
            f.write('A\nB\r\nC\rD+IKw-')
        self.addCleanup(safe_unlink, TESTFN)

        def check(mode, expected_lines):
            fi = FileInput(files=TESTFN, mode=mode,
                           openhook=hook_encoded('utf-7'))
            lines = list(fi)
            fi.close()
            self.assertEqual(lines, expected_lines)

        check('r', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rU', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('U', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
        check('rb', [u'A\n', u'B\r\n', u'C\r', u'D\u20ac'])
test_fileinput.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_zero_byte_files(self):
        t1 = t2 = t3 = t4 = None
        try:
            t1 = writeTmp(1, [""])
            t2 = writeTmp(2, [""])
            t3 = writeTmp(3, ["The only line there is.\n"])
            t4 = writeTmp(4, [""])
            fi = FileInput(files=(t1, t2, t3, t4))

            line = fi.readline()
            self.assertEqual(line, 'The only line there is.\n')
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 1)
            self.assertEqual(fi.filename(), t3)

            line = fi.readline()
            self.assertFalse(line)
            self.assertEqual(fi.lineno(), 1)
            self.assertEqual(fi.filelineno(), 0)
            self.assertEqual(fi.filename(), t4)
            fi.close()
        finally:
            remove_tempfiles(t1, t2, t3, t4)
test_fileinput.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_opening_mode(self):
        try:
            # invalid mode, should raise ValueError
            fi = FileInput(mode="w")
            self.fail("FileInput should reject invalid mode argument")
        except ValueError:
            pass
        t1 = None
        try:
            # try opening in universal newline mode
            t1 = writeTmp(1, [b"A\nB\r\nC\rD"], mode="wb")
            with check_warnings(('', DeprecationWarning)):
                fi = FileInput(files=t1, mode="U")
            with check_warnings(('', DeprecationWarning)):
                lines = list(fi)
            self.assertEqual(lines, ["A\n", "B\n", "C\n", "D"])
        finally:
            remove_tempfiles(t1)


问题


面经


文章

微信
公众号

扫码关注公众号