def __is_or_has_file(self, data):
'''
Figure out if we have been given a file-like object as one of the inputs to the function that called this.
Is a bit clunky because 'file' doesn't exist as a bare-word type check in Python 3 and built in file objects
are not instances of io.<anything> in Python 2
https://stackoverflow.com/questions/1661262/check-if-object-is-file-like-in-python
Returns:
Boolean - True if we have a file-like object
'''
if (hasattr(data, 'file')):
data = data.file
try:
return isinstance(data, file)
except NameError:
from io import IOBase
return isinstance(data, IOBase)
python类IOBase()的实例源码
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
"""Execute a prepared MySQL statement"""
parameters = list(parameters)
long_data_used = {}
if data:
for param_id, _ in enumerate(parameters):
if isinstance(data[param_id], IOBase):
binary = True
try:
binary = 'b' not in data[param_id].mode
except AttributeError:
pass
self.cmd_stmt_send_long_data(statement_id, param_id,
data[param_id])
long_data_used[param_id] = (binary,)
execute_packet = self._protocol.make_stmt_execute(
statement_id, data, tuple(parameters), flags,
long_data_used, self.charset)
packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
result = self._handle_binary_result(packet)
return result
def outputpapertemplate(self, dest, listchar, output=None):
if output == None:
output = PyPDF2.PdfFileWriter()
while listchar:
iopage = self.outputtemplateonepage(listchar)
page = PyPDF2.PdfFileReader(iopage)
output.addPage(page.getPage(0))
if dest != None:
if isinstance(dest, str): # when dest is a file path
destdir = os.path.dirname(dest)
if destdir != '' and not os.path.isdir(destdir):
os.makedirs(destdir)
with open(dest, "wb") as w:
output.write(w)
else: # when dest is io.IOBase
output.write(dest)
else:
return output
def __init__(self, record_type=None, record_name=None, data=None):
self._message_begin = self._message_end = False
self._type = self._name = self._data = ''
if not (record_type is None and record_name is None):
self.type = record_type if record_type is not None else 'unknown'
if record_name is not None:
self.name = record_name
if data is not None:
self.data = data
elif data is not None:
if isinstance(data, (bytearray, str)):
data = io.BytesIO(data)
if isinstance(data, io.IOBase):
self._read(data)
else:
raise TypeError("invalid data argument type")
def __init__(self, f, summary=True):
"""Args:
+ f: Either a file name or a seekable binary stream.
+ summary: If True, call self.read_summary().
"""
super().__init__(0)
if isinstance(f, IOBase):
self.stream = f
else:
self.stream = open(f, 'rb')
self.stream.seek(0, SEEK_END)
self.stream_size = self.stream.tell()
self.stream.seek(0, SEEK_SET)
if summary:
self.read_summary()
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
"""Execute a prepared MySQL statement"""
parameters = list(parameters)
long_data_used = {}
if data:
for param_id, _ in enumerate(parameters):
if isinstance(data[param_id], IOBase):
binary = True
try:
binary = 'b' not in data[param_id].mode
except AttributeError:
pass
self.cmd_stmt_send_long_data(statement_id, param_id,
data[param_id])
long_data_used[param_id] = (binary,)
execute_packet = self._protocol.make_stmt_execute(
statement_id, data, tuple(parameters), flags,
long_data_used, self.charset)
packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
result = self._handle_binary_result(packet)
return result
def _find_options_in_meta(self, content):
"""Reads 'content' and extracts options encoded in HTML meta tags
:param content: str or file-like object - contains HTML to parse
returns:
dict: {config option: value}
"""
if (isinstance(content, io.IOBase)
or content.__class__.__name__ == 'StreamReaderWriter'):
content = content.read()
found = {}
for x in re.findall('<meta [^>]*>', content):
if re.search('name=["\']%s' % self.config.meta_tag_prefix, x):
name = re.findall('name=["\']%s([^"\']*)' %
self.config.meta_tag_prefix, x)[0]
found[name] = re.findall('content=["\']([^"\']*)', x)[0]
return found
def test_wrap_non_iobase():
class FakeFile:
def close(self): # pragma: no cover
pass
def write(self): # pragma: no cover
pass
wrapped = FakeFile()
assert not isinstance(wrapped, io.IOBase)
async_file = trio.wrap_file(wrapped)
assert isinstance(async_file, AsyncIOWrapper)
del FakeFile.write
with pytest.raises(TypeError):
trio.wrap_file(FakeFile())
def __init__(self, record_type=None, record_name=None, data=None):
self._message_begin = self._message_end = False
self._type = self._name = self._data = ''
if not (record_type is None and record_name is None):
self.type = record_type if record_type is not None else 'unknown'
if record_name is not None:
self.name = record_name
if data is not None:
self.data = data
elif data is not None:
if isinstance(data, (bytearray, str)):
data = io.BytesIO(data)
if isinstance(data, io.IOBase):
self._read(data)
else:
raise TypeError("invalid data argument type")
def wrap_file_object(fileobj):
"""Handle differences in Python 2 and 3 around writing bytes."""
# If it's not an instance of IOBase, we're probably using Python 2 and
# that is less finnicky about writing text versus bytes to a file.
if not isinstance(fileobj, io.IOBase):
return fileobj
# At this point we're using Python 3 and that will mangle text written to
# a file written in bytes mode. So, let's check if the file can handle
# text as opposed to bytes.
if isinstance(fileobj, io.TextIOBase):
return fileobj
# Finally, we've determined that the fileobj passed in cannot handle text,
# so we use TextIOWrapper to handle the conversion for us.
return io.TextIOWrapper(fileobj)
def get_gramet_image_url(url_or_fp):
img_src = ''
if isinstance(url_or_fp, io.IOBase):
# noinspection PyUnresolvedReferences
data = url_or_fp.read()
u = urlsplit(OGIMET_URL)
else:
u = urlsplit(url_or_fp)
import requests
r = requests.get(url_or_fp)
data = r.text
if data:
m = re.search(r'<img src="([^"]+/gramet_[^"]+)"', data)
if m:
img_src = "{url.scheme}://{url.netloc}{path}".format(
url=u, path=m.group(1))
return img_src
def copy_file_data(src_file, dst_file, chunk_size=None):
"""Copy data from one file object to another.
Arguments:
src_file (io.IOBase): File open for reading.
dst_file (io.IOBase): File open for writing.
chunk_size (int, optional): Number of bytes to copy at
a time (or `None` to use sensible default).
"""
chunk_size = chunk_size or io.DEFAULT_BUFFER_SIZE
read = src_file.read
write = dst_file.write
# The 'or None' is so that it works with binary and text files
for chunk in iter(lambda: read(chunk_size) or None, None):
write(chunk)
def _find_options_in_meta(self, content):
"""Reads 'content' and extracts options encoded in HTML meta tags
:param content: str or file-like object - contains HTML to parse
returns:
dict: {config option: value}
"""
if (isinstance(content, io.IOBase)
or content.__class__.__name__ == 'StreamReaderWriter'):
content = content.read()
found = {}
for x in re.findall('<meta [^>]*>', content):
if re.search('name=["\']%s' % self.configuration.meta_tag_prefix, x):
name = re.findall('name=["\']%s([^"\']*)' %
self.configuration.meta_tag_prefix, x)[0]
found[name] = re.findall('content=["\']([^"\']*)', x)[0]
return found
def _upload_media_py3(self, media_type, media_file, extension=''):
if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'):
extension = media_file.name.split('.')[-1].lower()
if not is_allowed_extension(extension):
raise ValueError('Invalid file type.')
filename = media_file.name
elif isinstance(media_file, io.BytesIO):
extension = extension.lower()
if not is_allowed_extension(extension):
raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.')
filename = 'temp.' + extension
else:
raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.')
return self.request.post(
url='https://api.weixin.qq.com/cgi-bin/media/upload',
params={
'type': media_type,
},
files={
'media': (filename, media_file, convert_ext_to_mime(extension))
}
)
def truncate(self, size=None):
"""
Resize the stream to the given size in bytes (or the current position
if size is not specified). This resizing can extend or reduce the
current file size. The new file size is returned.
In prior versions of picamera, truncation also changed the position of
the stream (because prior versions of these stream classes were
non-seekable). This functionality is now deprecated; scripts should
use :meth:`~io.IOBase.seek` and :meth:`truncate` as one would with
regular :class:`~io.BytesIO` instances.
"""
if size is not None:
warnings.warn(
PiCameraDeprecated(
'This method changes the position of the stream to the '
'truncated length; this is deprecated functionality and '
'you should not rely on it (seek before or after truncate '
'to ensure position is consistent)'))
super(PiArrayOutput, self).truncate(size)
if size is not None:
self.seek(size)
def test_good_response(self, resolwe_mock, requests_mock, os_mock, open_mock):
resolwe_mock.configure_mock(**self.config)
os_mock.path.isfile.return_value = True
# When mocking open one wants it to return a "file-like" mock: (spec=io.IOBase)
mock_open.return_value = MagicMock(spec=io.IOBase)
requests_mock.get.return_value = MagicMock(ok=True,
**{'iter_content.return_value': range(3)})
Resolwe._download_files(resolwe_mock, self.file_list)
self.assertEqual(resolwe_mock.logger.info.call_count, 3)
# This asserts may seem wierd. To check what is happening behind the scenes:
# print(open_mock.mock_calls)
self.assertEqual(open_mock.return_value.__enter__.return_value.write.call_count, 6)
# Why 6? 2 files in self.file_list, each downloads 3 chunks (defined in response mock)
def add_fields(self, *fields):
to_add = list(fields)
while to_add:
rec = to_add.pop(0)
if isinstance(rec, io.IOBase):
k = guess_filename(rec, 'unknown')
self.add_field(k, rec)
elif isinstance(rec, (MultiDictProxy, MultiDict)):
to_add.extend(rec.items())
elif isinstance(rec, (list, tuple)) and len(rec) == 2:
k, fp = rec
self.add_field(k, fp)
else:
raise TypeError('Only io.IOBase, multidict and (name, file) '
'pairs allowed, use .add_field() for passing '
'more complex parameters, got {!r}'
.format(rec))
def __init__(self, obj, headers=None, *, chunk_size=8192):
if headers is None:
headers = CIMultiDict()
elif not isinstance(headers, CIMultiDict):
headers = CIMultiDict(headers)
self.obj = obj
self.headers = headers
self._chunk_size = chunk_size
self._fill_headers_with_defaults()
self._serialize_map = {
bytes: self._serialize_bytes,
str: self._serialize_str,
io.IOBase: self._serialize_io,
MultipartWriter: self._serialize_multipart,
('application', 'json'): self._serialize_json,
('application', 'x-www-form-urlencoded'): self._serialize_form
}
def _guess_content_length(self, obj):
if isinstance(obj, bytes):
return len(obj)
elif isinstance(obj, str):
*_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
charset = params.get('charset', 'us-ascii')
return len(obj.encode(charset))
elif isinstance(obj, io.StringIO):
*_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
charset = params.get('charset', 'us-ascii')
return len(obj.getvalue().encode(charset)) - obj.tell()
elif isinstance(obj, io.BytesIO):
return len(obj.getvalue()) - obj.tell()
elif isinstance(obj, io.IOBase):
try:
return os.fstat(obj.fileno()).st_size - obj.tell()
except (AttributeError, OSError):
return None
else:
return None
def upload(self, file, name: str=None, path: str=None) -> StackFile:
"""
Upload a file to Stack
:param file: IO pointer or string containing a path to a file
:param name: Custom name which will be used on the remote server, defaults to file.name
:param path: Path to upload it to, defaults to current working directory
:return: Instance of a stack file
"""
if not path:
path = self.__cwd
if isinstance(file, IOBase):
return self.__upload(file=file, path=path, name=name)
if isinstance(file, str):
with open(file, "rb") as fd:
return self.__upload(file=fd, path=path, name=name)
raise StackException("File should either be a path to a file on disk or an IO type, got: {}".format(type(file)))
def analyse_file(self, filename, token=None, language=None):
token = token or self.token
if token is None:
raise RecastError("Token is missing")
language = language or self.language
filename = open(filename, 'rb') if not isinstance(filename, io.IOBase) else filename
body = {'voice': filename}
if language is not None:
body['language'] = language
response = requests.post(
Utils.REQUEST_ENDPOINT,
files=body,
headers={'Authorization': "Token {}".format(token)}
)
if response.status_code != requests.codes.ok:
raise RecastError(response.json().get('message', ''))
return Response(response.json()['results'])
def close(self):
"""Delete the IO object and close the input file and the output file"""
if self.__closed:
# avoid double close
return
deleted = False
try:
# on posix, one can remove a file while it's opend by a process
# the file then will be not visable to others, but process still have the file descriptor
# it is recommand to remove temp file before close it on posix to avoid race
# on nt, it will just fail and raise OSError so that after closing remove it again
self.__del_files()
deleted = True
except OSError:
pass
if isinstance(self.input_file, IOBase):
self.input_file.close()
if isinstance(self.output_file, IOBase):
self.output_file.close()
if not deleted:
self.__del_files()
self.__closed = True
def __new__(cls, **kwargs):
"""Patch for abstractmethod-like enforcement in io.IOBase grandchildren."""
if (
not (hasattr(cls, '_read_bytes') and callable(cls._read_bytes))
or not (hasattr(cls, '_prep_message') and callable(cls._read_bytes))
or not hasattr(cls, '_config_class')
):
raise TypeError("Can't instantiate abstract class {}".format(cls.__name__))
instance = super(_EncryptionStream, cls).__new__(cls)
config = kwargs.pop('config', None)
if not isinstance(config, instance._config_class): # pylint: disable=protected-access
config = instance._config_class(**kwargs) # pylint: disable=protected-access
instance.config = config
instance.bytes_read = 0
instance.output_buffer = b''
instance._message_prepped = False # pylint: disable=protected-access
instance.source_stream = instance.config.source
instance._stream_length = instance.config.source_length # pylint: disable=protected-access
return instance
def deserialize(data):
if isinstance(data, IOBase):
try:
data.seek(0)
except UnsupportedOperation:
pass
if hasattr(data, 'readall'):
data = data.readall()
else:
data = data.read()
if isinstance(data, bytes):
data = str(data, encoding='utf-8')
if isinstance(data, str):
try:
data = json.loads(data, object_hook=collections.OrderedDict)
except json.JSONDecodeError as e:
data = yaml.load(data)
return data
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
"""Execute a prepared MySQL statement"""
parameters = list(parameters)
long_data_used = {}
if data:
for param_id, _ in enumerate(parameters):
if isinstance(data[param_id], IOBase):
binary = True
try:
binary = 'b' not in data[param_id].mode
except AttributeError:
pass
self.cmd_stmt_send_long_data(statement_id, param_id,
data[param_id])
long_data_used[param_id] = (binary,)
execute_packet = self._protocol.make_stmt_execute(
statement_id, data, tuple(parameters), flags,
long_data_used, self.charset)
packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
result = self._handle_binary_result(packet)
return result
def is_file(ob):
return isinstance(ob, io.IOBase)
def _is_filelike_object(f):
try:
return isinstance(f, (file, io.IOBase))
except NameError:
# 'file' is not a class in python3
return isinstance(f, io.IOBase)
def _is_file(f):
return isinstance(f, io.IOBase)
def put(self, data: Union[bytes, IOBase], *, format: str,
graph: Optional[IRI] = None):
async with self._crud_request("PUT", graph=graph, data=data,
content_type=format) as resp:
resp.raise_for_status()
def post(self, data: Union[bytes, IOBase], *, format: str,
graph: Optional[IRI] = None):
async with self._crud_request("POST", graph=graph, data=data,
content_type=format) as resp:
resp.raise_for_status()