python类lseek()的实例源码

filemap.py 文件源码 项目:isar 作者: ilbers 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _lseek(file_obj, offset, whence):
    """This is a helper function which invokes 'os.lseek' for file object
    'file_obj' and with specified 'offset' and 'whence'. The 'whence'
    argument is supposed to be either '_SEEK_DATA' or '_SEEK_HOLE'. When
    there is no more data or hole starting from 'offset', this function
    returns '-1'.  Otherwise the data or hole position is returned."""

    try:
        return os.lseek(file_obj.fileno(), offset, whence)
    except OSError as err:
        # The 'lseek' system call returns the ENXIO if there is no data or
        # hole starting from the specified offset.
        if err.errno == os.errno.ENXIO:
            return -1
        elif err.errno == os.errno.EINVAL:
            raise ErrorNotSupp("the kernel or file-system does not support "
                               "\"SEEK_HOLE\" and \"SEEK_DATA\"")
        else:
            raise
test_posix.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_fs_holes(self):
        # Even if the filesystem doesn't report holes,
        # if the OS supports it the SEEK_* constants
        # will be defined and will have a consistent
        # behaviour:
        # os.SEEK_DATA = current position
        # os.SEEK_HOLE = end of file position
        with open(support.TESTFN, 'r+b') as fp:
            fp.write(b"hello")
            fp.flush()
            size = fp.tell()
            fno = fp.fileno()
            try :
                for i in range(size):
                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
            except OSError :
                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
                # but it is not true.
                # For instance:
                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
                raise unittest.SkipTest("OSError raised!")
test_helpers.py 文件源码 项目:ubuntu-image 作者: CanonicalLtd 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def is_sparse(path):
    # LP: #1656371 - Looking at stat().st_blocks won't work on ZFS file
    # systems, since that seems to return 1, whereas on EXT4 it returns 0.
    # Rather than hard code the value based on file system type (which could
    # be different even on other file systems), a more reliable way seems to
    # be to use SEEK_DATA with an offset of 0 to find the first block of data
    # after position 0.  If there is no data, an ENXIO will get raised, at
    # least on any modern Linux kernels we care about.  See lseek(2) for
    # details.
    with open(path, 'r') as fp:
        try:
            os.lseek(fp.fileno(), 0, os.SEEK_DATA)
        except OSError as error:
            # There is no OSError subclass for ENXIO.
            if error.errno != errno.ENXIO:
                raise
            # The expected exception occurred, meaning, there is no data in
            # the file, so it's entirely sparse.
            return True
    # The expected exception did not occur, so there is data in the file.
    return False
Filemap.py 文件源码 项目:bmap-tools 作者: intel 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _lseek(file_obj, offset, whence):
    """This is a helper function which invokes 'os.lseek' for file object
    'file_obj' and with specified 'offset' and 'whence'. The 'whence'
    argument is supposed to be either '_SEEK_DATA' or '_SEEK_HOLE'. When
    there is no more data or hole starting from 'offset', this function
    returns '-1'.  Otherwise the data or hole position is returned."""

    try:
        return os.lseek(file_obj.fileno(), offset, whence)
    except OSError as err:
        # The 'lseek' system call returns the ENXIO if there is no data or
        # hole starting from the specified offset.
        if err.errno == os.errno.ENXIO:
            return -1
        elif err.errno == os.errno.EINVAL:
            raise ErrorNotSupp("the kernel or file-system does not support "
                               "\"SEEK_HOLE\" and \"SEEK_DATA\"")
        else:
            raise
test_posix.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_writev(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            n = os.writev(fd, (b'test1', b'tt2', b't3'))
            self.assertEqual(n, 10)

            os.lseek(fd, 0, os.SEEK_SET)
            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.writev(fd, [])
            except OSError:
                # writev(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
test_posix.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_readv(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            os.write(fd, b'test1tt2t3')
            os.lseek(fd, 0, os.SEEK_SET)
            buf = [bytearray(i) for i in [5, 3, 2]]
            self.assertEqual(posix.readv(fd, buf), 10)
            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.readv(fd, [])
            except OSError:
                # readv(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
test_posix.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_fs_holes(self):
        # Even if the filesystem doesn't report holes,
        # if the OS supports it the SEEK_* constants
        # will be defined and will have a consistent
        # behaviour:
        # os.SEEK_DATA = current position
        # os.SEEK_HOLE = end of file position
        with open(support.TESTFN, 'r+b') as fp:
            fp.write(b"hello")
            fp.flush()
            size = fp.tell()
            fno = fp.fileno()
            try :
                for i in range(size):
                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
            except OSError :
                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
                # but it is not true.
                # For instance:
                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
                raise unittest.SkipTest("OSError raised!")
test_posix.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_writev(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            n = os.writev(fd, (b'test1', b'tt2', b't3'))
            self.assertEqual(n, 10)

            os.lseek(fd, 0, os.SEEK_SET)
            self.assertEqual(b'test1tt2t3', posix.read(fd, 10))

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.writev(fd, [])
            except OSError:
                # writev(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
test_posix.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_readv(self):
        fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
        try:
            os.write(fd, b'test1tt2t3')
            os.lseek(fd, 0, os.SEEK_SET)
            buf = [bytearray(i) for i in [5, 3, 2]]
            self.assertEqual(posix.readv(fd, buf), 10)
            self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])

            # Issue #20113: empty list of buffers should not crash
            try:
                size = posix.readv(fd, [])
            except OSError:
                # readv(fd, []) raises OSError(22, "Invalid argument")
                # on OpenIndiana
                pass
            else:
                self.assertEqual(size, 0)
        finally:
            os.close(fd)
test_posix.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_fs_holes(self):
        # Even if the filesystem doesn't report holes,
        # if the OS supports it the SEEK_* constants
        # will be defined and will have a consistent
        # behaviour:
        # os.SEEK_DATA = current position
        # os.SEEK_HOLE = end of file position
        with open(support.TESTFN, 'r+b') as fp:
            fp.write(b"hello")
            fp.flush()
            size = fp.tell()
            fno = fp.fileno()
            try :
                for i in range(size):
                    self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
                    self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
                self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
            except OSError :
                # Some OSs claim to support SEEK_HOLE/SEEK_DATA
                # but it is not true.
                # For instance:
                # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
                raise unittest.SkipTest("OSError raised!")
test_util.py 文件源码 项目:NFStest 作者: thombashi 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def write_data(self, fd, offset=0, size=None, pattern=None):
        """Write data to the file given by the file descriptor

           fd:
               File descriptor
           offset:
               File offset where data will be written to [default: 0]
           size:
               Total number of bytes to write [default: --filesize option]
           pattern:
               Data pattern to write to the file [default: data_pattern default]
        """
        if size is None:
            size = self.filesize

        while size > 0:
            # Write as much as wsize bytes per write call
            dsize = min(self.wsize, size)
            os.lseek(fd, offset, 0)
            count = os.write(fd, self.data_pattern(offset, dsize, pattern))
            size -= count
            offset += count
connections.py 文件源码 项目:packetary 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _copy_stream(self, fd, url, offset):
        """Copies remote file to local.

        :param fd: the file`s descriptor
        :param url: the remote file`s url
        :param offset: the number of bytes from the beginning,
                       that will be skipped
        :return: the count of actually copied bytes
        """

        source = self.open_stream(url, offset)
        os.ftruncate(fd, offset)
        os.lseek(fd, offset, os.SEEK_SET)
        chunk_size = 16 * 1024
        size = 0
        while 1:
            chunk = source.read(chunk_size)
            if not chunk:
                break
            os.write(fd, chunk)
            size += len(chunk)
        return size
blobfs.py 文件源码 项目:blobfs 作者: mbartoli 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def read(self, path, length, offset, fh):
        if debug:
            print "read:       " + path
            print "offset:  " 
            print offset
            print "length: "
            print length 
            print fh
        full_path = self._full_path(path)
        print full_path
        #os.lseek(fh, offset, os.SEEK_SET)
        #if os.path.isfile(full_path) == False:
        import config as config
        account_name = config.STORAGE_ACCOUNT_NAME
        account_key = config.STORAGE_ACCOUNT_KEY
        containername = path.split('/')[1]
        filename = path.split('/')[2]
        service = baseblobservice.BaseBlobService(account_name, account_key)
        blob = service.get_blob_to_bytes(containername, filename, None, offset, offset+length-1)
        #blob = blob[offset:(offset+length)]
        bytes = blob.content 
        return bytes
        """try:
            if os.path.isdir(path.split('/')[1]) == False:
                os.mkdir(full_path.split('/')[0]+'/'+containername)
            if os.path.isfile(full_path) == False:
                print "read block blob" 
                block_blob_service.get_blob_to_path(containername, filename, full_path)
            else:
                os.remove(full_path)
                block_blob_service.get_blob_to_path(containername, filename, full_path)
        except:
            pass

        fhn = os.open(full_path, 32768)
        os.lseek(fhn, offset, os.SEEK_SET)

        #print "os.read(fh, length)"
        #print os.read(fh, length)
        return os.read(fhn, length)"""
blobfs.py 文件源码 项目:blobfs 作者: mbartoli 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def write(self, path, buf, offset, fh):
        if debug:
            print "write:   " + path
        os.lseek(fh, offset, os.SEEK_SET)
        return os.write(fh, buf)
tailbot.py 文件源码 项目:abusehelper 作者: Exploit-install 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def try_seek(fd, offset):
    try:
        if offset is None:
            os.lseek(fd, 0, os.SEEK_END)
        elif offset >= 0:
            os.lseek(fd, offset, os.SEEK_SET)
        else:
            os.lseek(fd, offset, os.SEEK_END)
    except OSError as ose:
        if ose.args[0] != errno.ESPIPE:
            raise
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def readChunk(self, offset, length):
        return self.server.avatar._runAsUser([ (os.lseek, (self.fd, offset, 0)),
                                               (os.read, (self.fd, length)) ])
unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def writeChunk(self, offset, data):
        return self.server.avatar._runAsUser([(os.lseek, (self.fd, offset, 0)),
                                       (os.write, (self.fd, data))])
tools.py 文件源码 项目:python-lunrclient 作者: rackerlabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_volume(self, id):
        """
        return volume information if the argument is an id or a path
        """
        # If the id is actually a path
        if exists(id):
            with open(id) as file:
                size = os.lseek(file.fileno(), 0, os.SEEK_END)
            return {'path': id, 'size': size}
        return self.volume.get(id)
vfs_manager.py 文件源码 项目:iotronic-lightning-rod 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def read(self, path, length, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.read(fh, length)
vfs_manager.py 文件源码 项目:iotronic-lightning-rod 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def write(self, path, buf, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.write(fh, buf)
vfs_manager.py 文件源码 项目:iotronic-lightning-rod 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def read(self, path, length, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.read(fh, length)
vfs_manager.py 文件源码 项目:iotronic-lightning-rod 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def write(self, path, buf, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.write(fh, buf)
vfs_library.py 文件源码 项目:iotronic-lightning-rod 作者: openstack 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def read(self, path, length, offset, fh):
        os.lseek(fh, offset, os.SEEK_SET)
        return os.read(fh, length)
smbserver.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def smbComWrite(connId, smbServer, SMBCommand, recvPacket):
        connData = smbServer.getConnectionData(connId)

        respSMBCommand        = smb.SMBCommand(smb.SMB.SMB_COM_WRITE)
        respParameters        = smb.SMBWriteResponse_Parameters()
        respData              = ''

        comWriteParameters =  smb.SMBWrite_Parameters(SMBCommand['Parameters'])
        comWriteData = smb.SMBWrite_Data(SMBCommand['Data'])

        if connData['OpenedFiles'].has_key(comWriteParameters['Fid']):
             fileHandle = connData['OpenedFiles'][comWriteParameters['Fid']]['FileHandle']
             errorCode = STATUS_SUCCESS
             try:
                 if fileHandle != PIPE_FILE_DESCRIPTOR:
                     # TODO: Handle big size files
                     # If we're trying to write past the file end we just skip the write call (Vista does this)
                     if os.lseek(fileHandle, 0, 2) >= comWriteParameters['Offset']: 
                         os.lseek(fileHandle,comWriteParameters['Offset'],0)
                         os.write(fileHandle,comWriteData['Data'])
                 else:
                     sock = connData['OpenedFiles'][comWriteParameters['Fid']]['Socket']
                     sock.send(comWriteData['Data'])
                 respParameters['Count']    = comWriteParameters['Count']
             except Exception, e:
                 smbServer.log('smbComWrite: %s' % e, logging.ERROR)
                 errorCode = STATUS_ACCESS_DENIED
        else:
            errorCode = STATUS_INVALID_HANDLE


        if errorCode > 0:
            respParameters = ''
            respData       = ''

        respSMBCommand['Parameters']             = respParameters
        respSMBCommand['Data']                   = respData 
        smbServer.setConnectionData(connId, connData)

        return [respSMBCommand], None, errorCode
smbserver.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def smbComRead(connId, smbServer, SMBCommand, recvPacket):
        connData = smbServer.getConnectionData(connId)

        respSMBCommand        = smb.SMBCommand(smb.SMB.SMB_COM_READ)
        respParameters        = smb.SMBReadResponse_Parameters()
        respData              = smb.SMBReadResponse_Data()

        comReadParameters =  smb.SMBRead_Parameters(SMBCommand['Parameters'])

        if connData['OpenedFiles'].has_key(comReadParameters['Fid']):
             fileHandle = connData['OpenedFiles'][comReadParameters['Fid']]['FileHandle']
             errorCode = STATUS_SUCCESS
             try:
                 if fileHandle != PIPE_FILE_DESCRIPTOR:
                     # TODO: Handle big size files
                     os.lseek(fileHandle,comReadParameters['Offset'],0)
                     content = os.read(fileHandle,comReadParameters['Count'])
                 else:
                     sock = connData['OpenedFiles'][comReadParameters['Fid']]['Socket']
                     content = sock.recv(comReadParameters['Count'])
                 respParameters['Count']    = len(content)
                 respData['DataLength']     = len(content)
                 respData['Data']           = content
             except Exception, e:
                 smbServer.log('smbComRead: %s ' % e, logging.ERROR)
                 errorCode = STATUS_ACCESS_DENIED
        else:
            errorCode = STATUS_INVALID_HANDLE

        if errorCode > 0:
            respParameters = ''
            respData       = ''

        respSMBCommand['Parameters']             = respParameters
        respSMBCommand['Data']                   = respData 
        smbServer.setConnectionData(connId, connData)

        return [respSMBCommand], None, errorCode
smbserver.py 文件源码 项目:PiBunny 作者: tholum 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def smb2Read(connId, smbServer, recvPacket):
        connData = smbServer.getConnectionData(connId)

        respSMBCommand = smb2.SMB2Read_Response()
        readRequest   = smb2.SMB2Read(recvPacket['Data'])

        respSMBCommand['Buffer'] = '\x00'

        if str(readRequest['FileID']) == '\xff'*16:
            # Let's take the data from the lastRequest
            if  connData['LastRequest'].has_key('SMB2_CREATE'):
                fileID = connData['LastRequest']['SMB2_CREATE']['FileID']
            else:
                fileID = str(readRequest['FileID'])
        else:
            fileID = str(readRequest['FileID'])

        if connData['OpenedFiles'].has_key(fileID):
             fileHandle = connData['OpenedFiles'][fileID]['FileHandle']
             errorCode = 0
             try:
                 if fileHandle != PIPE_FILE_DESCRIPTOR:
                     offset = readRequest['Offset']
                     os.lseek(fileHandle,offset,0)
                     content = os.read(fileHandle,readRequest['Length'])
                 else:
                     sock = connData['OpenedFiles'][fileID]['Socket']
                     content = sock.recv(readRequest['Length'])

                 respSMBCommand['DataOffset']   = 0x50
                 respSMBCommand['DataLength']   = len(content)
                 respSMBCommand['DataRemaining']= 0
                 respSMBCommand['Buffer']       = content
             except Exception, e:
                 smbServer.log('SMB2_READ: %s ' % e, logging.ERROR)
                 errorCode = STATUS_ACCESS_DENIED
        else:
            errorCode = STATUS_INVALID_HANDLE

        smbServer.setConnectionData(connId, connData)
        return [respSMBCommand], None, errorCode
_fileobjectposix.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def seekable(self):
        if self._seekable is None:
            try:
                os.lseek(self._fileno, 0, os.SEEK_CUR)
            except OSError:
                self._seekable = False
            else:
                self._seekable = True
        return self._seekable
_fileobjectposix.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def seek(self, offset, whence=0):
        return os.lseek(self._fileno, offset, whence)
_fileobjectposix.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def seekable(self):
        if self._seekable is None:
            try:
                os.lseek(self._fileno, 0, os.SEEK_CUR)
            except OSError:
                self._seekable = False
            else:
                self._seekable = True
        return self._seekable
_fileobjectposix.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def seek(self, offset, whence=0):
        return os.lseek(self._fileno, offset, whence)


问题


面经


文章

微信
公众号

扫码关注公众号