python类BufferedReader()的实例源码

makefile.py 文件源码 项目:workflows.kyoyue 作者: wizyoung 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
file_utils.py 文件源码 项目:pheweb 作者: statgen 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read_gzip(filepath):
    # handles buffering # TODO: profile whether this is fast.
    with gzip.open(filepath, 'rb') as f: # leave in binary mode (default), let TextIOWrapper decode
        with io.BufferedReader(f, buffer_size=2**18) as g: # 256KB buffer
            with io.TextIOWrapper(g) as h: # bytes -> unicode
                yield h
common.py 文件源码 项目:wltrace 作者: jhshi 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, path, *args, **kwargs):
        super(WlTrace, self).__init__()

        self.path = path
        self.fh = io.BufferedReader(io.open(path, 'rb'))
        self.counter = 1

        self.pkt_queue = collections.deque()
        self.has_phy_info = False

        self.fix_timestamp = kwargs.get('fix_timestamp', False)
binary_io.py 文件源码 项目:io_scene_bfres 作者: Syroot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.reader = io.BufferedReader(self.raw)
        return self
_winconsole.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _get_text_stdin(buffer_stream):
    text_stream = _NonClosingTextIOWrapper(
        io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)),
        'utf-16-le', 'strict', line_buffering=True)
    return ConsoleStream(text_stream, buffer_stream)
_winconsole.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_text_stdin(buffer_stream):
    text_stream = _NonClosingTextIOWrapper(
        io.BufferedReader(_WindowsConsoleReader(STDIN_HANDLE)),
        'utf-16-le', 'strict', line_buffering=True)
    return ConsoleStream(text_stream, buffer_stream)
makefile.py 文件源码 项目:purelove 作者: hucmosin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
makefile.py 文件源码 项目:devsecops-example-helloworld 作者: boozallen 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
makefile.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
makefile.py 文件源码 项目:harbour-sailfinder 作者: DylanVanAssche 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
makefile.py 文件源码 项目:QXSConsolas 作者: qxsch 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
py3k.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def isfileobj(f):
        return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
tools.py 文件源码 项目:sequana 作者: sequana 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _use_gzip(self):
        i = 0
        with gzip.open(self.filename) as gz_file:
            with io.BufferedReader(gz_file) as f:
                for line in f:
                    i += 1
        return i
makefile.py 文件源码 项目:ghostlines-robofont 作者: ghostlines 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
makefile.py 文件源码 项目:ghostlines-robofont 作者: ghostlines 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
connections.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _makefile(sock, mode):
        return io.BufferedReader(SocketIO(sock, mode))
connections.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _makefile(sock, mode):
        return io.BufferedReader(SocketIO(sock, mode))
makefile.py 文件源码 项目:Dshield 作者: ywjt 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def backport_makefile(self, mode="r", buffering=None, encoding=None,
                      errors=None, newline=None):
    """
    Backport of ``socket.makefile`` from Python 3.5.
    """
    if not set(mode) <= set(["r", "w", "b"]):
        raise ValueError(
            "invalid mode %r (only r, w, b allowed)" % (mode,)
        )
    writing = "w" in mode
    reading = "r" in mode or not writing
    assert reading or writing
    binary = "b" in mode
    rawmode = ""
    if reading:
        rawmode += "r"
    if writing:
        rawmode += "w"
    raw = SocketIO(self, rawmode)
    self._makefile_refs += 1
    if buffering is None:
        buffering = -1
    if buffering < 0:
        buffering = io.DEFAULT_BUFFER_SIZE
    if buffering == 0:
        if not binary:
            raise ValueError("unbuffered streams must be binary")
        return raw
    if reading and writing:
        buffer = io.BufferedRWPair(raw, raw, buffering)
    elif reading:
        buffer = io.BufferedReader(raw, buffering)
    else:
        assert writing
        buffer = io.BufferedWriter(raw, buffering)
    if binary:
        return buffer
    text = io.TextIOWrapper(buffer, encoding, errors, newline)
    text.mode = mode
    return text
test_vlv.py 文件源码 项目:midi 作者: MicroTransactionsMatterToo 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        self.mock_file = MagicMock()  # type: Union[BufferedReader, MagicMock]
        self.mock_file.read = MagicMock(side_effect=[b'\x84', b'\x18', b'\x00', b'\x3C'])  # type: MagicMock
meta_events.py 文件源码 项目:midi 作者: MicroTransactionsMatterToo 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sequence_number(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
    length_bytes = bytearray(data.read(4))
    length = int.from_bytes(length_bytes, "big")
    if length != 2:
        raise EventLengthError("Sequence Number length was incorrect. It should be 2, but it was {}".format(length))
    sequence_num_raw = bytearray(data.read(2))
    sequence_num = int.from_bytes(sequence_num_raw, "big")
    return length, sequence_num, sequence_num_raw


问题


面经


文章

微信
公众号

扫码关注公众号