python类fstat()的实例源码

utils.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def super_len(o):
    if hasattr(o, '__len__'):
        return len(o)

    if hasattr(o, 'len'):
        return o.len

    if hasattr(o, 'fileno'):
        try:
            fileno = o.fileno()
        except io.UnsupportedOperation:
            pass
        else:
            return os.fstat(fileno).st_size

    if hasattr(o, 'getvalue'):
        # e.g. BytesIO, cStringIO.StringI
        return len(o.getvalue())
__init__.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks: https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in backwardcompat due to differences on AIX and Jython,
    that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerabity, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError("%s is a symlink; Will not return uid for symlinks" % path)
    return file_uid
utils.py 文件源码 项目:flasky 作者: RoseOu 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def super_len(o):
    if hasattr(o, '__len__'):
        return len(o)

    if hasattr(o, 'len'):
        return o.len

    if hasattr(o, 'fileno'):
        try:
            fileno = o.fileno()
        except io.UnsupportedOperation:
            pass
        else:
            return os.fstat(fileno).st_size

    if hasattr(o, 'getvalue'):
        # e.g. BytesIO, cStringIO.StringI
        return len(o.getvalue())
httplib.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _set_content_length(self, body, method):
        # Set the content-length based on the body. If the body is "empty", we
        # set Content-Length: 0 for methods that expect a body (RFC 7230,
        # Section 3.3.2). If the body is set for other methods, we set the
        # header provided we can figure out what the length is.
        thelen = None
        if body is None and method.upper() in _METHODS_EXPECTING_BODY:
            thelen = '0'
        elif body is not None:
            try:
                thelen = str(len(body))
            except (TypeError, AttributeError):
                # If this is a file-like object, try to
                # fstat its file descriptor
                try:
                    thelen = str(os.fstat(body.fileno()).st_size)
                except (AttributeError, OSError):
                    # Don't send a length if this failed
                    if self.debuglevel > 0: print "Cannot stat!!"

        if thelen is not None:
            self.putheader('Content-Length', thelen)
appcfg.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def GetFileLength(fh):
  """Returns the length of the file represented by fh.

  This function is capable of finding the length of any seekable stream,
  unlike os.fstat, which only works on file streams.

  Args:
    fh: The stream to get the length of.

  Returns:
    The length of the stream.
  """
  pos = fh.tell()

  fh.seek(0, 2)
  length = fh.tell()
  fh.seek(pos, 0)
  return length
client.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _set_content_length(self, body):
        # Set the content-length based on the body.
        thelen = None
        try:
            thelen = str(len(body))
        except TypeError as te:
            # If this is a file-like object, try to
            # fstat its file descriptor
            try:
                thelen = str(os.fstat(body.fileno()).st_size)
            except (AttributeError, OSError):
                # Don't send a length if this failed
                if self.debuglevel > 0: print("Cannot stat!!")

        if thelen is not None:
            self.putheader('Content-Length', thelen)
posix.py 文件源码 项目:MCSManager-fsmodule 作者: Suwings 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, devname=None):
        if devname is None:
            self.name = "/dev/urandom"
        else:
            self.name = devname

        # Test that /dev/urandom is a character special device
        f = open(self.name, "rb", 0)
        fmode = os.fstat(f.fileno())[stat.ST_MODE]
        if not stat.S_ISCHR(fmode):
            f.close()
            raise TypeError("%r is not a character special device" % (self.name,))

        self.__file = f

        BaseRNG.__init__(self)
shotgun.py 文件源码 项目:edx-video-pipeline 作者: edx 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def encode(self, params, files, boundary=None, buffer=None):
        if boundary is None:
            boundary = mimetools.choose_boundary()
        if buffer is None:
            buffer = cStringIO.StringIO()
        for (key, value) in params:
            buffer.write('--%s\r\n' % boundary)
            buffer.write('Content-Disposition: form-data; name="%s"' % key)
            buffer.write('\r\n\r\n%s\r\n' % value)
        for (key, fd) in files:
            filename = fd.name.split('/')[-1]
            content_type = mimetypes.guess_type(filename)[0]
            content_type = content_type or 'application/octet-stream'
            file_size = os.fstat(fd.fileno())[stat.ST_SIZE]
            buffer.write('--%s\r\n' % boundary)
            c_dis = 'Content-Disposition: form-data; name="%s"; filename="%s"%s'
            content_disposition = c_dis % (key, filename, '\r\n')
            buffer.write(content_disposition)
            buffer.write('Content-Type: %s\r\n' % content_type)
            buffer.write('Content-Length: %s\r\n' % file_size)
            fd.seek(0)
            buffer.write('\r\n%s\r\n' % fd.read())
        buffer.write('--%s--\r\n\r\n' % boundary)
        buffer = buffer.getvalue()
        return boundary, buffer
encoder.py 文件源码 项目:flickr_downloader 作者: Denisolt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def total_len(o):
    if hasattr(o, '__len__'):
        return len(o)

    if hasattr(o, 'len'):
        return o.len

    if hasattr(o, 'fileno'):
        try:
            fileno = o.fileno()
        except io.UnsupportedOperation:
            pass
        else:
            return os.fstat(fileno).st_size

    if hasattr(o, 'getvalue'):
        # e.g. BytesIO, cStringIO.StringIO
        return len(o.getvalue())
__init__.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks:
        https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in compat due to differences on AIX and
    Jython, that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError(
                "%s is a symlink; Will not return uid for symlinks" % path
            )
    return file_uid
genericpath.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sameopenfile(fp1, fp2):
    """Test whether two open file objects reference the same file"""
    s1 = os.fstat(fp1)
    s2 = os.fstat(fp2)
    return samestat(s1, s2)


# Split a path in root and extension.
# The extension is everything starting at the last dot in the last
# pathname component; the root is everything before that.
# It is always true that root + ext == p.

# Generic implementation of splitext, to be parametrized with
# the separators
rhd.py 文件源码 项目:spyking-circus 作者: spyking-circus 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def read_qstring(fid):
    """Read Qt style QString.  

    The first 32-bit unsigned number indicates the length of the string (in bytes).  
    If this number equals 0xFFFFFFFF, the string is null.

    Strings are stored as unicode.
    """

    length, = struct.unpack('<I', fid.read(4))
    if length == int('ffffffff', 16): return ""

    if length > (os.fstat(fid.fileno()).st_size - fid.tell() + 1) :
        print(length)
        raise Exception('Length too long.')

    # convert length from bytes to 16-bit Unicode words
    length = int(length / 2)

    data = []
    for i in range(0, length):
        c, = struct.unpack('<H', fid.read(2))
        data.append(c)

    if sys.version_info >= (3,0):
        a = ''.join([chr(c) for c in data])
    else:
        a = ''.join([unichr(c) for c in data])

    return a
neuralynx.py 文件源码 项目:spyking-circus 作者: spyking-circus 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _read_from_header(self):

        folder_path       = os.path.dirname(os.path.abspath(self.file_name))
        self.all_files    = self._get_sorted_channels_()

        regexpr           = re.compile('\d+')
        self.all_channels = []
        for f in self.all_files:
            self.all_channels += [int(regexpr.findall(f)[0])]

        self.header             = self._read_header_(self.all_files[0])

        header                  = {}
        header['sampling_rate'] = float(self.header['SamplingFrequency'])        
        header['nb_channels']   = len(self.all_files)
        header['gain']          = float(self.header['ADBitVolts'])*1000000        

        self.inverse     = self.header.has_key('InputInverted') and (self.header['InputInverted'] == 'True')
        if self.inverse:
            header['gain'] *= -1

        g                = open(self.all_files[0], 'rb')
        self.size        = ((os.fstat(g.fileno()).st_size - self.NUM_HEADER_BYTES)//self.RECORD_SIZE - 1) * self.SAMPLES_PER_RECORD
        self._shape      = (self.size, header['nb_channels'])
        g.close()

        return header
__init__.py 文件源码 项目:my-first-blog 作者: AnkurBegining 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def get_path_uid(path):
    """
    Return path's uid.

    Does not follow symlinks:
        https://github.com/pypa/pip/pull/935#discussion_r5307003

    Placed this function in compat due to differences on AIX and
    Jython, that should eventually go away.

    :raises OSError: When path is a symlink or can't be read.
    """
    if hasattr(os, 'O_NOFOLLOW'):
        fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW)
        file_uid = os.fstat(fd).st_uid
        os.close(fd)
    else:  # AIX and Jython
        # WARNING: time of check vulnerability, but best we can do w/o NOFOLLOW
        if not os.path.islink(path):
            # older versions of Jython don't have `os.fstat`
            file_uid = os.stat(path).st_uid
        else:
            # raise OSError for parity with os.O_NOFOLLOW above
            raise OSError(
                "%s is a symlink; Will not return uid for symlinks" % path
            )
    return file_uid
cors_server_py3.py 文件源码 项目:geojson2mvt 作者: NYCPlanning 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def send_head(self):
        """Common code for GET and HEAD commands.
        This sends the response code and MIME headers.
        Return value is either a file object (which has to be copied
        to the outputfile by the caller unless the command was HEAD,
        and must be closed by the caller under all circumstances), or
        None, in which case the caller has nothing further to do.
        """
        path = self.translate_path(self.path)
        f = None
        if os.path.isdir(path):
            if not self.path.endswith('/'):
                # redirect browser - doing basically what apache does
                self.send_response(301)
                self.send_header("Location", self.path + "/")
                self.end_headers()
                return None
            for index in "index.html", "index.htm":
                index = os.path.join(path, index)
                if os.path.exists(index):
                    path = index
                    break
            else:
                return self.list_directory(path)
        ctype = self.guess_type(path)
        try:
            # Always read in binary mode. Opening files in text mode may cause
            # newline translations, making the actual size of the content
            # transmitted *less* than the content-length!
            f = open(path, 'rb')
        except IOError:
            self.send_error(404, "File not found")
            return None
        self.send_response(200)
        self.send_header("Content-type", ctype)
        fs = os.fstat(f.fileno())
        self.send_header("Content-Length", str(fs[6]))
        self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
        self.send_header("Access-Control-Allow-Origin", "*")
        self.end_headers()
        return f
cors_server.py 文件源码 项目:geojson2mvt 作者: NYCPlanning 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def send_head(self):
        """Common code for GET and HEAD commands.
        This sends the response code and MIME headers.
        Return value is either a file object (which has to be copied
        to the outputfile by the caller unless the command was HEAD,
        and must be closed by the caller under all circumstances), or
        None, in which case the caller has nothing further to do.
        """
        path = self.translate_path(self.path)
        f = None
        if os.path.isdir(path):
            if not self.path.endswith('/'):
                # redirect browser - doing basically what apache does
                self.send_response(301)
                self.send_header("Location", self.path + "/")
                self.end_headers()
                return None
            for index in "index.html", "index.htm":
                index = os.path.join(path, index)
                if os.path.exists(index):
                    path = index
                    break
            else:
                return self.list_directory(path)
        ctype = self.guess_type(path)
        try:
            # Always read in binary mode. Opening files in text mode may cause
            # newline translations, making the actual size of the content
            # transmitted *less* than the content-length!
            f = open(path, 'rb')
        except IOError:
            self.send_error(404, "File not found")
            return None
        self.send_response(200)
        self.send_header("Content-type", ctype)
        fs = os.fstat(f.fileno())
        self.send_header("Content-Length", str(fs[6]))
        self.send_header("Last-Modified", self.date_time_string(fs.st_mtime))
        self.send_header("Access-Control-Allow-Origin", "*")
        self.end_headers()
        return f
posixpath.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sameopenfile(fp1, fp2):
    """Test whether two open file objects reference the same file"""
    s1 = os.fstat(fp1)
    s2 = os.fstat(fp2)
    return samestat(s1, s2)


# Are two stat buffers (obtained from stat, fstat or lstat)
# describing the same file?
indexer.py 文件源码 项目:ipwb 作者: oduwsdl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def askUserForEncryptionKey():
    if DEBUG:  # Allows testing instead of requiring a user prompt
        return 'ipwb'

    outputRedirected = os.fstat(0) != os.fstat(1)
    promptString = 'Enter a key for encryption: '
    if outputRedirected:  # Prevents prompt in redir output
        promptString = ''
        print(promptString, file=sys.stderr)

    key = raw_input(promptString)

    return key
Jpeg2KImagePlugin.py 文件源码 项目:imagepaste 作者: robinchenyu 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _open(self):
        sig = self.fp.read(4)
        if sig == b'\xff\x4f\xff\x51':
            self.codec = "j2k"
            self.size, self.mode = _parse_codestream(self.fp)
        else:
            sig = sig + self.fp.read(8)

            if sig == b'\x00\x00\x00\x0cjP  \x0d\x0a\x87\x0a':
                self.codec = "jp2"
                self.size, self.mode = _parse_jp2_header(self.fp)
            else:
                raise SyntaxError('not a JPEG 2000 file')

        if self.size is None or self.mode is None:
            raise SyntaxError('unable to determine size/mode')

        self.reduce = 0
        self.layers = 0

        fd = -1
        length = -1

        try:
            fd = self.fp.fileno()
            length = os.fstat(fd).st_size
        except:
            fd = -1
            try:
                pos = self.fp.tell()
                self.fp.seek(0, 2)
                length = self.fp.tell()
                self.fp.seek(pos, 0)
            except:
                length = -1

        self.tile = [('jpeg2k', (0, 0) + self.size, 0,
                      (self.codec, self.reduce, self.layers, fd, length))]
Jpeg2KImagePlugin.py 文件源码 项目:Projects 作者: it2school 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _open(self):
        sig = self.fp.read(4)
        if sig == b'\xff\x4f\xff\x51':
            self.codec = "j2k"
            self.size, self.mode = _parse_codestream(self.fp)
        else:
            sig = sig + self.fp.read(8)

            if sig == b'\x00\x00\x00\x0cjP  \x0d\x0a\x87\x0a':
                self.codec = "jp2"
                self.size, self.mode = _parse_jp2_header(self.fp)
            else:
                raise SyntaxError('not a JPEG 2000 file')

        if self.size is None or self.mode is None:
            raise SyntaxError('unable to determine size/mode')

        self.reduce = 0
        self.layers = 0

        fd = -1
        length = -1

        try:
            fd = self.fp.fileno()
            length = os.fstat(fd).st_size
        except:
            fd = -1
            try:
                pos = self.fp.tell()
                self.fp.seek(0, 2)
                length = self.fp.tell()
                self.fp.seek(pos, 0)
            except:
                length = -1

        self.tile = [('jpeg2k', (0, 0) + self.size, 0,
                      (self.codec, self.reduce, self.layers, fd, length, self.fp))]


问题


面经


文章

微信
公众号

扫码关注公众号