python类read()的实例源码

tarfile.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def fromtarfile(cls, tarfile):
        """Return the next TarInfo object from TarFile object
           tarfile.
        """
        buf = tarfile.fileobj.read(BLOCKSIZE)
        obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
        obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
        return obj._proc_member(tarfile)

    #--------------------------------------------------------------------------
    # The following are methods that are called depending on the type of a
    # member. The entry point is _proc_member() which can be overridden in a
    # subclass to add custom _proc_*() methods. A _proc_*() method MUST
    # implement the following
    # operations:
    # 1. Set self.offset_data to the position where the data blocks begin,
    #    if there is data that follows.
    # 2. Set tarfile.offset to the position where the next member's header will
    #    begin.
    # 3. Return self or another valid TarInfo object.
tarfile.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _proc_gnulong(self, tarfile):
        """Process the blocks that hold a GNU longname
           or longlink member.
        """
        buf = tarfile.fileobj.read(self._block(self.size))

        # Fetch the next header and process it.
        try:
            next = self.fromtarfile(tarfile)
        except HeaderError:
            raise SubsequentHeaderError("missing or bad subsequent header")

        # Patch the TarInfo object from the next header with
        # the longname information.
        next.offset = self.offset
        if self.type == GNUTYPE_LONGNAME:
            next.name = nts(buf, tarfile.encoding, tarfile.errors)
        elif self.type == GNUTYPE_LONGLINK:
            next.linkname = nts(buf, tarfile.encoding, tarfile.errors)

        return next
tarfile.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __next__(self):
        """Return the next item using TarFile's next() method.
           When all members have been read, set TarFile as _loaded.
        """
        # Fix for SF #1100429: Under rare circumstances it can
        # happen that getmembers() is called during iteration,
        # which will cause TarIter to stop prematurely.
        if not self.tarfile._loaded:
            tarinfo = self.tarfile.next()
            if not tarinfo:
                self.tarfile._loaded = True
                raise StopIteration
        else:
            try:
                tarinfo = self.tarfile.members[self.index]
            except IndexError:
                raise StopIteration
        self.index += 1
        return tarinfo
tarfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read(self, size=None):
        """Return the next size number of bytes from the stream.
           If size is not defined, return all bytes of the stream
           up to EOF.
        """
        if size is None:
            t = []
            while True:
                buf = self._read(self.bufsize)
                if not buf:
                    break
                t.append(buf)
            buf = "".join(t)
        else:
            buf = self._read(size)
        self.pos += len(buf)
        return buf
tarfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def fromtarfile(cls, tarfile):
        """Return the next TarInfo object from TarFile object
           tarfile.
        """
        buf = tarfile.fileobj.read(BLOCKSIZE)
        obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
        obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
        return obj._proc_member(tarfile)

    #--------------------------------------------------------------------------
    # The following are methods that are called depending on the type of a
    # member. The entry point is _proc_member() which can be overridden in a
    # subclass to add custom _proc_*() methods. A _proc_*() method MUST
    # implement the following
    # operations:
    # 1. Set self.offset_data to the position where the data blocks begin,
    #    if there is data that follows.
    # 2. Set tarfile.offset to the position where the next member's header will
    #    begin.
    # 3. Return self or another valid TarInfo object.
tarfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _proc_gnulong(self, tarfile):
        """Process the blocks that hold a GNU longname
           or longlink member.
        """
        buf = tarfile.fileobj.read(self._block(self.size))

        # Fetch the next header and process it.
        try:
            next = self.fromtarfile(tarfile)
        except HeaderError:
            raise SubsequentHeaderError("missing or bad subsequent header")

        # Patch the TarInfo object from the next header with
        # the longname information.
        next.offset = self.offset
        if self.type == GNUTYPE_LONGNAME:
            next.name = nts(buf, tarfile.encoding, tarfile.errors)
        elif self.type == GNUTYPE_LONGLINK:
            next.linkname = nts(buf, tarfile.encoding, tarfile.errors)

        return next
tarfile.py 文件源码 项目:pip-update-requirements 作者: alanhamlett 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __next__(self):
        """Return the next item using TarFile's next() method.
           When all members have been read, set TarFile as _loaded.
        """
        # Fix for SF #1100429: Under rare circumstances it can
        # happen that getmembers() is called during iteration,
        # which will cause TarIter to stop prematurely.
        if not self.tarfile._loaded:
            tarinfo = self.tarfile.next()
            if not tarinfo:
                self.tarfile._loaded = True
                raise StopIteration
        else:
            try:
                tarinfo = self.tarfile.members[self.index]
            except IndexError:
                raise StopIteration
        self.index += 1
        return tarinfo
recipe-574437.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def input(s, stdout = True, timeout = 2,
            prompt = rgx2nd(('(.+)', 'py3to2 server: \\1'), ),
            subprompt = rgx2nd(('>>> ', '', None), ),
            fndexc = re.compile('\WTraceback '),
            ):
    self = _server
    if not s: return
    SERVER.stdin.write(s)
    try:
      buf = ''
      SERVER.stdin.write("\n\nimport os, signal; os.kill(CLIENTPID, signal.SIGINT)\n")
      time.sleep(timeout)
      raise IOError('py3to2 server not responding to input: %s'%repr(s))
    except KeyboardInterrupt:
      buf = os.read(SERVERIO[0], self.bufsize)
      buf = subprompt.sub(buf)
      if prompt: buf = prompt.sub(buf)
    if fndexc.search(buf): raise IOError('py3to2 server input: %s\n%s'%(s, buf))
    if stdout: sys.stdout.write(buf)
    else: return buf
recipe-574436.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def input(s, stdout = True, timeout = 2,
            prompt = rgx2nd(('(.+)', 'py3to2 server: \\1'), ),
            subprompt = rgx2nd(('>>> ', '', None), ),
            fndexc = re.compile('\WTraceback '),
            ):
    self = _server
    if not s: return
    SERVER.stdin.write(s)
    try:
      buf = ''
      SERVER.stdin.write("\n\nimport os, signal; os.kill(CLIENTPID, signal.SIGINT)\n")
      time.sleep(timeout)
      raise IOError('py3to2 server not responding to input: %s'%repr(s))
    except KeyboardInterrupt:
      buf = os.read(SERVERIO[0], self.bufsize)
      buf = subprompt.sub(buf)
      if prompt: buf = prompt.sub(buf)
    if fndexc.search(buf): raise IOError('py3to2 server input: %s\n%s'%(s, buf))
    if stdout: sys.stdout.write(buf)
    else: return buf
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def write(self, n=65536):
        """
        Called when it is detected the output side of this connector is ready
        to write. Reads (potentially blocks) at most n bytes and writes them to
        the output ends of this connector. If no bytes could be read, the connector
        is closed.

        :param n The maximum number of bytes to write.
        :type n int
        :returns: The actual number of bytes written.
        """
        buf = self.input.read(n)

        if buf:
            return self.output.write(buf)
        else:
            self.close()

        return 0
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def read(self, n=65536):
        """
        Called when it is detected the input side of this connector is ready
        to read. Reads at most n bytes and writes them to the output ends of
        this connector. If no bytes could be read, the connector is closed.

        :param n The maximum number of bytes to read.
        :type n int
        :returns: The actual number of bytes read.
        """
        buf = self.input.read(n)

        if buf:
            self.output.write(buf)
            # TODO PushAdapter/Writers should return number of bytes actually
            # written.
            return len(buf)
        else:
            self.close()

        return 0
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write(self, buf):
        """
        Write a chunk of data to the output stream in accordance with the
        chunked transfer encoding protocol.
        """
        try:
            self.conn.send(hex(len(buf))[2:].encode('utf-8'))
            self.conn.send(b'\r\n')
            self.conn.send(buf)
            self.conn.send(b'\r\n')
        except Exception:
            resp = self.conn.getresponse()
            sys.stderr.write(
                'Exception while sending HTTP chunk to %s, status was %s, '
                'message was:\n%s\n' % (self.output_spec['url'], resp.status,
                                        resp.read()))
            self.conn.close()
            self._closed = True
            raise
__init__.py 文件源码 项目:girder_worker 作者: girder 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def close(self):
        """
        Close the output stream. Called after the last data is sent.
        """
        if self._closed:
            return

        try:
            self.conn.send(b'0\r\n\r\n')
            resp = self.conn.getresponse()
            if resp.status >= 300 and resp.status < 400:
                raise Exception('Redirects are not supported for streaming '
                                'requests at this time. %d to Location: %s' % (
                                    resp.status, resp.getheader('Location')))
            if resp.status >= 400:
                raise Exception(
                    'HTTP stream output to %s failed with status %d. Response '
                    'was: %s' % (
                        self.output_spec['url'], resp.status, resp.read()))
        finally:
            self.conn.close()
process_controller.py 文件源码 项目:SublimeTerm 作者: percevalw 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def keep_reading(self):
        """Output thread method for the process

        Sends the process output to the ViewController (through OutputTranscoder)
        """
        while True:
            if self.stop:
                break
            ret = self.process.poll()
            if ret is not None:
                self.stop = True
            readable, writable, executable = select.select([self.master], [], [], 5)
            if readable:
                """ We read the new content """
                data = os.read(self.master, 1024)
                text = data.decode('UTF-8', errors='replace')
                log_debug("RAW", repr(text))
                log_debug("PID", os.getenv('BASHPID'))
                self.output_transcoder.decode(text)
            #                log_debug("{} >> {}".format(int(time.time()), repr(text)))
__init__.py 文件源码 项目:binja_dynamics 作者: nccgroup 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def run(self):
        while True :
            writefd = []
            if not self.messages.empty():
                # Expects a message to contain either the string 'exit'
                # or a line of input in a tuple: ('input', None)
                message = self.messages.get()
                if message == 'exit':
                    self.messages.task_done()
                    break
                else:
                    message, _encoding = message
                    writefd = [self.master]
            r,w,_ = select.select([self.master], writefd, [], 0)
            if r:
                # Read when the binary has new output for us (sometimes this came from us writing)
                line = os.read(self.master, 1024) # Reads up to a kilobyte at once. Should this be higher/lower?
                self.RECV_LINE.emit(line)
            if w:
                os.write(self.master, message + "\n")
                self.messages.task_done()
tarfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def read(self, size=None):
        """Return the next size number of bytes from the stream.
           If size is not defined, return all bytes of the stream
           up to EOF.
        """
        if size is None:
            t = []
            while True:
                buf = self._read(self.bufsize)
                if not buf:
                    break
                t.append(buf)
            buf = "".join(t)
        else:
            buf = self._read(size)
        self.pos += len(buf)
        return buf
tarfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def fromtarfile(cls, tarfile):
        """Return the next TarInfo object from TarFile object
           tarfile.
        """
        buf = tarfile.fileobj.read(BLOCKSIZE)
        obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
        obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
        return obj._proc_member(tarfile)

    #--------------------------------------------------------------------------
    # The following are methods that are called depending on the type of a
    # member. The entry point is _proc_member() which can be overridden in a
    # subclass to add custom _proc_*() methods. A _proc_*() method MUST
    # implement the following
    # operations:
    # 1. Set self.offset_data to the position where the data blocks begin,
    #    if there is data that follows.
    # 2. Set tarfile.offset to the position where the next member's header will
    #    begin.
    # 3. Return self or another valid TarInfo object.
tarfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _proc_gnulong(self, tarfile):
        """Process the blocks that hold a GNU longname
           or longlink member.
        """
        buf = tarfile.fileobj.read(self._block(self.size))

        # Fetch the next header and process it.
        try:
            next = self.fromtarfile(tarfile)
        except HeaderError:
            raise SubsequentHeaderError("missing or bad subsequent header")

        # Patch the TarInfo object from the next header with
        # the longname information.
        next.offset = self.offset
        if self.type == GNUTYPE_LONGNAME:
            next.name = nts(buf, tarfile.encoding, tarfile.errors)
        elif self.type == GNUTYPE_LONGLINK:
            next.linkname = nts(buf, tarfile.encoding, tarfile.errors)

        return next
tarfile.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __next__(self):
        """Return the next item using TarFile's next() method.
           When all members have been read, set TarFile as _loaded.
        """
        # Fix for SF #1100429: Under rare circumstances it can
        # happen that getmembers() is called during iteration,
        # which will cause TarIter to stop prematurely.
        if not self.tarfile._loaded:
            tarinfo = self.tarfile.next()
            if not tarinfo:
                self.tarfile._loaded = True
                raise StopIteration
        else:
            try:
                tarinfo = self.tarfile.members[self.index]
            except IndexError:
                raise StopIteration
        self.index += 1
        return tarinfo
_termui_impl.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def getchar(echo):
        if not isatty(sys.stdin):
            f = open('/dev/tty')
            fd = f.fileno()
        else:
            fd = sys.stdin.fileno()
            f = None
        try:
            old_settings = termios.tcgetattr(fd)
            try:
                tty.setraw(fd)
                ch = os.read(fd, 32)
                if echo and isatty(sys.stdout):
                    sys.stdout.write(ch)
            finally:
                termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
                sys.stdout.flush()
                if f is not None:
                    f.close()
        except termios.error:
            pass
        _translate_ch_to_exc(ch)
        return ch.decode(get_best_encoding(sys.stdin), 'replace')


问题


面经


文章

微信
公众号

扫码关注公众号