def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
python类UnsupportedOperation()的实例源码
def test_writing_to_stdout_is_diverted_if_broken(self, sys):
def sample(**kwargs):
assert kwargs['stdin'] is None
assert kwargs['stdout'] is not None
assert kwargs['stdout'] is not sys.stdout
sys.stdout.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stdin=None, stdout=None)
def test_writing_to_stdout_is_not_diverted_if_wrapstdout_is_false(self, sys):
def sample(**kwargs):
assert kwargs['stdin'] is None
assert kwargs['stdout'] is None
sys.stdout.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample, wrap_stdout=False)(stdin=None, stdout=None)
def test_writing_to_stderr_is_diverted_if_broken(self, sys):
def sample(**kwargs):
assert kwargs['stderr'] is not None
assert kwargs['stderr'] is not sys.stdout
sys.stderr.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stderr=None)
def test_reading_from_stdin_is_not_diverted(self, sys):
def sample(**kwargs):
assert kwargs['stdin'] is None
assert kwargs['stdout'] is None
sys.stdin.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stdin=None, stdout=None)
def test_arbitrary_outputs_are_not_replaced_even_if_stdout_is_broken(self, sys):
output = io.StringIO()
def sample(**kwargs):
assert kwargs['stdin'] is None
assert kwargs['stdout'] is output
sys.stdout.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stdin=None, stdout=output)
def test_writing_to_broken_stderr_is_output_after_returning(self, sys):
def sample(**kwargs):
kwargs['stderr'].write('Output')
# The output isn't written until we return
sys.stderr.write.assert_not_called()
sys.stderr.fileno.side_effect = io.UnsupportedOperation
utils.wrap_subprocess_call(sample)(stderr=None)
sys.stderr.write.assert_called_once_with('Output')
def wrap_subprocess_call(func, wrap_stdout=True):
@functools.wraps(func)
def wrapper(*popenargs, **kwargs):
out = kwargs.get('stdout', None)
err = kwargs.get('stderr', None)
replay_out = False
replay_err = False
if out is None and wrap_stdout:
try:
sys.stdout.fileno()
except io.UnsupportedOperation:
kwargs['stdout'] = tempfile.NamedTemporaryFile()
replay_out = True
if err is None:
try:
sys.stderr.fileno()
except io.UnsupportedOperation:
kwargs['stderr'] = tempfile.NamedTemporaryFile()
replay_err = True
try:
return func(*popenargs, **kwargs)
finally:
if replay_out:
kwargs['stdout'].seek(0)
sys.stdout.write(kwargs['stdout'].read())
if replay_err:
kwargs['stderr'].seek(0)
sys.stderr.write(kwargs['stderr'].read())
return wrapper
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
def open_stream(self, file_stream, **keywords):
"""open ods file stream"""
if not hasattr(file_stream, 'seek'):
# python 2
# Hei zipfile in odfpy would do a seek
# but stream from urlib cannot do seek
file_stream = BytesIO(file_stream.read())
try:
file_stream.seek(0)
except UnsupportedOperation:
# python 3
file_stream = BytesIO(file_stream.read())
BookReader.open_stream(self, file_stream, **keywords)
self._load_from_memory()
def has_fileno(obj):
if not hasattr(obj, "fileno"):
return False
# check BytesIO case and maybe others
try:
obj.fileno()
except (AttributeError, IOError, io.UnsupportedOperation):
return False
return True
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
current_position = o.tell()
return max(0, total_length - current_position)
def __read(self, n):
if not self._readable:
raise UnsupportedOperation('read')
while True:
try:
return _read(self._fileno, n)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._read_event)
def write(self, b):
if not self._writable:
raise UnsupportedOperation('write')
while True:
try:
return _write(self._fileno, b)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._write_event)
def __read(self, n):
if not self._readable:
raise UnsupportedOperation('read')
while True:
try:
return _read(self._fileno, n)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._read_event)
def write(self, b):
if not self._writable:
raise UnsupportedOperation('write')
while True:
try:
return _write(self._fileno, b)
except (IOError, OSError) as ex:
if ex.args[0] not in ignored_errors:
raise
self.hub.wait(self._write_event)