python类BufferedIOBase()的实例源码

saxutils.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)):
        # use a codecs stream writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
saxutils.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    class UnbufferedTextIOWrapper(io.TextIOWrapper):
        def write(self, s):
            super(UnbufferedTextIOWrapper, self).write(s)
            self.flush()
    return UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
saxutils.py 文件源码 项目:empyrion-python-api 作者: huhlig 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
teepty.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _stdin_filename(self, stdin):
        if stdin is None: 
            rtn = None
        elif isinstance(stdin, io.FileIO) and os.path.isfile(stdin.name):
            rtn = stdin.name
        elif isinstance(stdin, (io.BufferedIOBase, str, bytes)):
            self._temp_stdin = tsi = tempfile.NamedTemporaryFile()
            rtn = tsi.name
        else:
            raise ValueError('stdin not understood {0!r}'.format(stdin))
        return rtn
stack.py 文件源码 项目:TransIP-STACK-API 作者: Paradoxis 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def download_into(self, file: str, buffer: Union[BufferedIOBase, BytesIO]=None, remote_path: str=None) -> IOBase:
        """
        Download a file from your STACK account
        :param file: File name to download
        :param remote_path: Path to find the file in
        :param buffer: Buffer to download into (BytesIO, StringIO, file pointer)
        :return: BytesIO buffer
        """
        if not remote_path:
            remote_path = self.__cwd

        file = join(remote_path, file.lstrip("/"))

        if not buffer:
            buffer = BytesIO()

        if not isinstance(buffer, BufferedIOBase):
            raise StackException("Download buffer must be a binary IO type, please use BytesIO or open your file in 'rb' mode.")

        try:
            self.webdav.download_to(buffer, file.lstrip("/"))
            buffer.seek(0)
            return buffer

        except WebDavException as e:
            raise StackException(e)
bz2.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def readinto(self, b):
        """Read up to len(b) bytes into b.

        Returns the number of bytes read (0 for EOF).
        """
        with self._lock:
            return io.BufferedIOBase.readinto(self, b)
bz2.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def readlines(self, size=-1):
        """Read a list of lines of uncompressed bytes from the file.

        size can be specified to control the number of lines read: no
        further lines will be read once the total size of the lines read
        so far equals or exceeds size.
        """
        if not isinstance(size, int):
            if not hasattr(size, "__index__"):
                raise TypeError("Integer argument expected")
            size = size.__index__()
        with self._lock:
            return io.BufferedIOBase.readlines(self, size)
bz2.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def writelines(self, seq):
        """Write a sequence of byte strings to the file.

        Returns the number of uncompressed bytes written.
        seq can be any iterable yielding byte strings.

        Line separators are not added between the written byte strings.
        """
        with self._lock:
            return io.BufferedIOBase.writelines(self, seq)

    # Rewind the file to the beginning of the data stream.
test_subprocess.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_io_buffered_by_default(self):
        p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
                             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        try:
            self.assertIsInstance(p.stdin, io.BufferedIOBase)
            self.assertIsInstance(p.stdout, io.BufferedIOBase)
            self.assertIsInstance(p.stderr, io.BufferedIOBase)
        finally:
            p.stdin.close()
            p.stdout.close()
            p.stderr.close()
            p.wait()
saxutils.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        return sys.stdout

    if isinstance(out, io.TextIOBase):
        # use a text writer as is
        return out

    if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)):
        # use a codecs stream writer as is
        return out

    # wrap a binary writer with TextIOWrapper
    if isinstance(out, io.RawIOBase):
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        class _wrapper:
            __class__ = out.__class__
            def __getattr__(self, name):
                return getattr(out, name)
        buffer = _wrapper()
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    return io.TextIOWrapper(buffer, encoding=encoding,
                            errors='xmlcharrefreplace',
                            newline='\n',
                            write_through=True)
saxutils.py 文件源码 项目:Docker-XX-Net 作者: kuanghy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
saxutils.py 文件源码 项目:kind2anki 作者: prz3m 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
zipfile.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
models.py 文件源码 项目:pyfc4 作者: ghukill 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _prep_binary_content(self):

        '''     
        Sets delivery method of either payload or header
        Favors Content-Location header if set

        Args:
            None

        Returns:
            None: sets attributes in self.binary and headers
        '''

        # nothing present
        if not self.data and not self.location and 'Content-Location' not in self.resource.headers.keys():
            raise Exception('creating/updating NonRDFSource requires content from self.binary.data, self.binary.location, or the Content-Location header')

        elif 'Content-Location' in self.resource.headers.keys():
            logger.debug('Content-Location header found, using')
            self.delivery = 'header'

        # if Content-Location is not set, look for self.data_location then self.data
        elif 'Content-Location' not in self.resource.headers.keys():

            # data_location set, trumps Content self.data
            if self.location:
                # set appropriate header
                self.resource.headers['Content-Location'] = self.location
                self.delivery = 'header'

            # data attribute is plain text, binary, or file-like object
            elif self.data:

                # if file-like object, set flag for api.http_request
                if isinstance(self.data, io.BufferedIOBase):
                    logger.debug('detected file-like object')
                    self.delivery = 'payload'

                # else, just bytes
                else:
                    logger.debug('detected bytes')
                    self.delivery = 'payload'
zipfile.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
zipfile.py 文件源码 项目:Parallel.GAMIT 作者: demiangomez 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
zipfile.py 文件源码 项目:ApkParser 作者: yigitozgumus 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
zipfile.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
zipfile.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find('\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = ''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == '':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + '\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line
zipfile.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def readline(self, limit=-1):
        """Read and return a line from the stream.

        If limit is specified, at most limit bytes will be read.
        """

        if not self._universal and limit < 0:
            # Shortcut common case - newline found in buffer.
            i = self._readbuffer.find(b'\n', self._offset) + 1
            if i > 0:
                line = self._readbuffer[self._offset: i]
                self._offset = i
                return line

        if not self._universal:
            return io.BufferedIOBase.readline(self, limit)

        line = b''
        while limit < 0 or len(line) < limit:
            readahead = self.peek(2)
            if readahead == b'':
                return line

            #
            # Search for universal newlines or line chunks.
            #
            # The pattern returns either a line chunk or a newline, but not
            # both. Combined with peek(2), we are assured that the sequence
            # '\r\n' is always retrieved completely and never split into
            # separate newlines - '\r', '\n' due to coincidental readaheads.
            #
            match = self.PATTERN.search(readahead)
            newline = match.group('newline')
            if newline is not None:
                if self.newlines is None:
                    self.newlines = []
                if newline not in self.newlines:
                    self.newlines.append(newline)
                self._offset += len(newline)
                return line + b'\n'

            chunk = match.group('chunk')
            if limit >= 0:
                chunk = chunk[: limit - len(line)]

            self._offset += len(chunk)
            line += chunk

        return line


问题


面经


文章

微信
公众号

扫码关注公众号