python类IOBase()的实例源码

ecs.py 文件源码 项目:deployfish 作者: caltechads 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __is_or_has_file(self, data):
        '''
        Figure out if we have been given a file-like object as one of the inputs to the function that called this.
        Is a bit clunky because 'file' doesn't exist as a bare-word type check in Python 3 and built in file objects
        are not instances of io.<anything> in Python 2

        https://stackoverflow.com/questions/1661262/check-if-object-is-file-like-in-python
        Returns:
            Boolean - True if we have a file-like object
        '''
        if (hasattr(data, 'file')):
            data = data.file

        try:
            return isinstance(data, file)
        except NameError:
            from io import IOBase
            return isinstance(data, IOBase)
connection.py 文件源码 项目:python-mysql-pool 作者: LuciferJack 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
        """Execute a prepared MySQL statement"""
        parameters = list(parameters)
        long_data_used = {}

        if data:
            for param_id, _ in enumerate(parameters):
                if isinstance(data[param_id], IOBase):
                    binary = True
                    try:
                        binary = 'b' not in data[param_id].mode
                    except AttributeError:
                        pass
                    self.cmd_stmt_send_long_data(statement_id, param_id,
                                                 data[param_id])
                    long_data_used[param_id] = (binary,)

        execute_packet = self._protocol.make_stmt_execute(
            statement_id, data, tuple(parameters), flags,
            long_data_used, self.charset)
        packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
        result = self._handle_binary_result(packet)
        return result
tilecharbox.py 文件源码 项目:handfontgen 作者: nixeneko 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def outputpapertemplate(self, dest, listchar, output=None):
        if output == None:
            output = PyPDF2.PdfFileWriter()

        while listchar:
            iopage = self.outputtemplateonepage(listchar)
            page = PyPDF2.PdfFileReader(iopage)
            output.addPage(page.getPage(0))

        if dest != None:
            if isinstance(dest, str): # when dest is a file path
                destdir = os.path.dirname(dest)
                if destdir != '' and not os.path.isdir(destdir):
                    os.makedirs(destdir)
                with open(dest, "wb") as w:
                    output.write(w)
            else: # when dest is io.IOBase
                output.write(dest)
        else:
            return output
record.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, record_type=None, record_name=None, data=None):
        self._message_begin = self._message_end = False
        self._type = self._name = self._data = ''
        if not (record_type is None and record_name is None):
            self.type = record_type if record_type is not None else 'unknown'
            if record_name is not None:
                self.name = record_name
            if data is not None:
                self.data = data
        elif data is not None:
            if isinstance(data, (bytearray, str)):
                data = io.BytesIO(data)
            if isinstance(data, io.IOBase):
                self._read(data)
            else:
                raise TypeError("invalid data argument type")
container.py 文件源码 项目:python-ebml 作者: QBobWatson 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, f, summary=True):
        """Args:
         + f: Either a file name or a seekable binary stream.
         + summary: If True, call self.read_summary().
        """
        super().__init__(0)
        if isinstance(f, IOBase):
            self.stream = f
        else:
            self.stream = open(f, 'rb')
        self.stream.seek(0, SEEK_END)
        self.stream_size = self.stream.tell()
        self.stream.seek(0, SEEK_SET)

        if summary:
            self.read_summary()
connection.py 文件源码 项目:importacsv 作者: rasertux 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
        """Execute a prepared MySQL statement"""
        parameters = list(parameters)
        long_data_used = {}

        if data:
            for param_id, _ in enumerate(parameters):
                if isinstance(data[param_id], IOBase):
                    binary = True
                    try:
                        binary = 'b' not in data[param_id].mode
                    except AttributeError:
                        pass
                    self.cmd_stmt_send_long_data(statement_id, param_id,
                                                 data[param_id])
                    long_data_used[param_id] = (binary,)

        execute_packet = self._protocol.make_stmt_execute(
            statement_id, data, tuple(parameters), flags,
            long_data_used, self.charset)
        packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
        result = self._handle_binary_result(packet)
        return result
imgkit.py 文件源码 项目:imgkit 作者: jarrekk 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _find_options_in_meta(self, content):
        """Reads 'content' and extracts options encoded in HTML meta tags

        :param content: str or file-like object - contains HTML to parse

        returns:
          dict: {config option: value}
        """
        if (isinstance(content, io.IOBase)
            or content.__class__.__name__ == 'StreamReaderWriter'):
            content = content.read()

        found = {}

        for x in re.findall('<meta [^>]*>', content):
            if re.search('name=["\']%s' % self.config.meta_tag_prefix, x):
                name = re.findall('name=["\']%s([^"\']*)' %
                                  self.config.meta_tag_prefix, x)[0]
                found[name] = re.findall('content=["\']([^"\']*)', x)[0]

        return found
test_file_io.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_wrap_non_iobase():
    class FakeFile:
        def close(self):  # pragma: no cover
            pass

        def write(self):  # pragma: no cover
            pass

    wrapped = FakeFile()
    assert not isinstance(wrapped, io.IOBase)

    async_file = trio.wrap_file(wrapped)
    assert isinstance(async_file, AsyncIOWrapper)

    del FakeFile.write

    with pytest.raises(TypeError):
        trio.wrap_file(FakeFile())
record.py 文件源码 项目:bitpay-brick 作者: javgh 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, record_type=None, record_name=None, data=None):
        self._message_begin = self._message_end = False
        self._type = self._name = self._data = ''
        if not (record_type is None and record_name is None):
            self.type = record_type if record_type is not None else 'unknown'
            if record_name is not None:
                self.name = record_name
            if data is not None:
                self.data = data
        elif data is not None:
            if isinstance(data, (bytearray, str)):
                data = io.BytesIO(data)
            if isinstance(data, io.IOBase):
                self._read(data)
            else:
                raise TypeError("invalid data argument type")
utils.py 文件源码 项目:bandit-ss 作者: zeroSteiner 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def wrap_file_object(fileobj):
    """Handle differences in Python 2 and 3 around writing bytes."""
    # If it's not an instance of IOBase, we're probably using Python 2 and
    # that is less finnicky about writing text versus bytes to a file.
    if not isinstance(fileobj, io.IOBase):
        return fileobj

    # At this point we're using Python 3 and that will mangle text written to
    # a file written in bytes mode. So, let's check if the file can handle
    # text as opposed to bytes.
    if isinstance(fileobj, io.TextIOBase):
        return fileobj

    # Finally, we've determined that the fileobj passed in cannot handle text,
    # so we use TextIOWrapper to handle the conversion for us.
    return io.TextIOWrapper(fileobj)
ogimet.py 文件源码 项目:editolido 作者: flyingeek 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_gramet_image_url(url_or_fp):
    img_src = ''
    if isinstance(url_or_fp, io.IOBase):
        # noinspection PyUnresolvedReferences
        data = url_or_fp.read()
        u = urlsplit(OGIMET_URL)
    else:
        u = urlsplit(url_or_fp)
        import requests
        r = requests.get(url_or_fp)
        data = r.text
    if data:
        m = re.search(r'<img src="([^"]+/gramet_[^"]+)"', data)
        if m:
            img_src = "{url.scheme}://{url.netloc}{path}".format(
                url=u, path=m.group(1))
    return img_src
tools.py 文件源码 项目:pyfilesystem2 作者: PyFilesystem 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def copy_file_data(src_file, dst_file, chunk_size=None):
    """Copy data from one file object to another.

    Arguments:
        src_file (io.IOBase): File open for reading.
        dst_file (io.IOBase): File open for writing.
        chunk_size (int, optional): Number of bytes to copy at
            a time (or `None` to use sensible default).

    """
    chunk_size = chunk_size or io.DEFAULT_BUFFER_SIZE
    read = src_file.read
    write = dst_file.write
    # The 'or None' is so that it works with binary and text files
    for chunk in iter(lambda: read(chunk_size) or None, None):
        write(chunk)
pdfkit.py 文件源码 项目:Remarkable 作者: jamiemcg 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _find_options_in_meta(self, content):
        """Reads 'content' and extracts options encoded in HTML meta tags

        :param content: str or file-like object - contains HTML to parse

        returns:
          dict: {config option: value}
        """
        if (isinstance(content, io.IOBase)
                or content.__class__.__name__ == 'StreamReaderWriter'):
            content = content.read()

        found = {}

        for x in re.findall('<meta [^>]*>', content):
            if re.search('name=["\']%s' % self.configuration.meta_tag_prefix, x):
                name = re.findall('name=["\']%s([^"\']*)' %
                                  self.configuration.meta_tag_prefix, x)[0]
                found[name] = re.findall('content=["\']([^"\']*)', x)[0]

        return found
basic.py 文件源码 项目:wechat-async-sdk 作者: ihjmh 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _upload_media_py3(self, media_type, media_file, extension=''):
        if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'):
            extension = media_file.name.split('.')[-1].lower()
            if not is_allowed_extension(extension):
                raise ValueError('Invalid file type.')
            filename = media_file.name
        elif isinstance(media_file, io.BytesIO):
            extension = extension.lower()
            if not is_allowed_extension(extension):
                raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.')
            filename = 'temp.' + extension
        else:
            raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.')

        return self.request.post(
            url='https://api.weixin.qq.com/cgi-bin/media/upload',
            params={
                'type': media_type,
            },
            files={
                'media': (filename, media_file, convert_ext_to_mime(extension))
            }
        )
array.py 文件源码 项目:ivport-v2 作者: ivmech 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def truncate(self, size=None):
        """
        Resize the stream to the given size in bytes (or the current position
        if size is not specified). This resizing can extend or reduce the
        current file size.  The new file size is returned.

        In prior versions of picamera, truncation also changed the position of
        the stream (because prior versions of these stream classes were
        non-seekable). This functionality is now deprecated; scripts should
        use :meth:`~io.IOBase.seek` and :meth:`truncate` as one would with
        regular :class:`~io.BytesIO` instances.
        """
        if size is not None:
            warnings.warn(
                PiCameraDeprecated(
                    'This method changes the position of the stream to the '
                    'truncated length; this is deprecated functionality and '
                    'you should not rely on it (seek before or after truncate '
                    'to ensure position is consistent)'))
        super(PiArrayOutput, self).truncate(size)
        if size is not None:
            self.seek(size)
test_resolwe.py 文件源码 项目:resolwe-bio-py 作者: genialis 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_good_response(self, resolwe_mock, requests_mock, os_mock, open_mock):
        resolwe_mock.configure_mock(**self.config)
        os_mock.path.isfile.return_value = True

        # When mocking open one wants it to return a "file-like" mock: (spec=io.IOBase)
        mock_open.return_value = MagicMock(spec=io.IOBase)

        requests_mock.get.return_value = MagicMock(ok=True,
                                                   **{'iter_content.return_value': range(3)})

        Resolwe._download_files(resolwe_mock, self.file_list)
        self.assertEqual(resolwe_mock.logger.info.call_count, 3)

        # This asserts may seem wierd. To check what is happening behind the scenes:
        # print(open_mock.mock_calls)
        self.assertEqual(open_mock.return_value.__enter__.return_value.write.call_count, 6)
        # Why 6? 2 files in self.file_list, each downloads 3 chunks (defined in response mock)
helpers.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_fields(self, *fields):
        to_add = list(fields)

        while to_add:
            rec = to_add.pop(0)

            if isinstance(rec, io.IOBase):
                k = guess_filename(rec, 'unknown')
                self.add_field(k, rec)

            elif isinstance(rec, (MultiDictProxy, MultiDict)):
                to_add.extend(rec.items())

            elif isinstance(rec, (list, tuple)) and len(rec) == 2:
                k, fp = rec
                self.add_field(k, fp)

            else:
                raise TypeError('Only io.IOBase, multidict and (name, file) '
                                'pairs allowed, use .add_field() for passing '
                                'more complex parameters, got {!r}'
                                .format(rec))
multipart.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, obj, headers=None, *, chunk_size=8192):
        if headers is None:
            headers = CIMultiDict()
        elif not isinstance(headers, CIMultiDict):
            headers = CIMultiDict(headers)

        self.obj = obj
        self.headers = headers
        self._chunk_size = chunk_size
        self._fill_headers_with_defaults()

        self._serialize_map = {
            bytes: self._serialize_bytes,
            str: self._serialize_str,
            io.IOBase: self._serialize_io,
            MultipartWriter: self._serialize_multipart,
            ('application', 'json'): self._serialize_json,
            ('application', 'x-www-form-urlencoded'): self._serialize_form
        }
multipart.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _guess_content_length(self, obj):
        if isinstance(obj, bytes):
            return len(obj)
        elif isinstance(obj, str):
            *_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
            charset = params.get('charset', 'us-ascii')
            return len(obj.encode(charset))
        elif isinstance(obj, io.StringIO):
            *_, params = parse_mimetype(self.headers.get(CONTENT_TYPE))
            charset = params.get('charset', 'us-ascii')
            return len(obj.getvalue().encode(charset)) - obj.tell()
        elif isinstance(obj, io.BytesIO):
            return len(obj.getvalue()) - obj.tell()
        elif isinstance(obj, io.IOBase):
            try:
                return os.fstat(obj.fileno()).st_size - obj.tell()
            except (AttributeError, OSError):
                return None
        else:
            return None
stack.py 文件源码 项目:TransIP-STACK-API 作者: Paradoxis 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def upload(self, file, name: str=None, path: str=None) -> StackFile:
        """
        Upload a file to Stack
        :param file: IO pointer or string containing a path to a file
        :param name: Custom name which will be used on the remote server, defaults to file.name
        :param path: Path to upload it to, defaults to current working directory
        :return: Instance of a stack file
        """
        if not path:
            path = self.__cwd

        if isinstance(file, IOBase):
            return self.__upload(file=file, path=path, name=name)

        if isinstance(file, str):
            with open(file, "rb") as fd:
                return self.__upload(file=fd, path=path, name=name)

        raise StackException("File should either be a path to a file on disk or an IO type, got: {}".format(type(file)))
analyse.py 文件源码 项目:SDK-python 作者: RecastAI 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def analyse_file(self, filename, token=None, language=None):
    token = token or self.token
    if token is None:
      raise RecastError("Token is missing")

    language = language or self.language

    filename = open(filename, 'rb') if not isinstance(filename, io.IOBase) else filename

    body = {'voice': filename}
    if language is not None:
      body['language'] = language

    response = requests.post(
      Utils.REQUEST_ENDPOINT,
      files=body,
      headers={'Authorization': "Token {}".format(token)}
    )
    if response.status_code != requests.codes.ok:
      raise RecastError(response.json().get('message', ''))

    return Response(response.json()['results'])
io.py 文件源码 项目:cyaron 作者: luogu-dev 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def close(self):
        """Delete the IO object and close the input file and the output file"""
        if self.__closed:
            # avoid double close
            return
        deleted = False
        try:
            # on posix, one can remove a file while it's opend by a process
            # the file then will be not visable to others, but process still have the file descriptor
            # it is recommand to remove temp file before close it on posix to avoid race
            # on nt, it will just fail and raise OSError so that after closing remove it again
            self.__del_files()
            deleted = True
        except OSError:
            pass
        if isinstance(self.input_file, IOBase):
            self.input_file.close()
        if isinstance(self.output_file, IOBase):
            self.output_file.close()
        if not deleted:
            self.__del_files()
        self.__closed = True
streaming_client.py 文件源码 项目:aws-encryption-sdk-python 作者: awslabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __new__(cls, **kwargs):
        """Patch for abstractmethod-like enforcement in io.IOBase grandchildren."""
        if (
                not (hasattr(cls, '_read_bytes') and callable(cls._read_bytes))
                or not (hasattr(cls, '_prep_message') and callable(cls._read_bytes))
                or not hasattr(cls, '_config_class')
        ):
            raise TypeError("Can't instantiate abstract class {}".format(cls.__name__))

        instance = super(_EncryptionStream, cls).__new__(cls)

        config = kwargs.pop('config', None)
        if not isinstance(config, instance._config_class):  # pylint: disable=protected-access
            config = instance._config_class(**kwargs)  # pylint: disable=protected-access
        instance.config = config

        instance.bytes_read = 0
        instance.output_buffer = b''
        instance._message_prepped = False  # pylint: disable=protected-access
        instance.source_stream = instance.config.source
        instance._stream_length = instance.config.source_length  # pylint: disable=protected-access

        return instance
model.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def deserialize(data):
    if isinstance(data, IOBase):
        try:
            data.seek(0)
        except UnsupportedOperation:
            pass
        if hasattr(data, 'readall'):
            data = data.readall()
        else:
            data = data.read()
    if isinstance(data, bytes):
        data = str(data, encoding='utf-8')
    if isinstance(data, str):
        try:
            data = json.loads(data, object_hook=collections.OrderedDict)
        except json.JSONDecodeError as e:
            data = yaml.load(data)
    return data
connection.py 文件源码 项目:Chorus 作者: DonaldBough 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def cmd_stmt_execute(self, statement_id, data=(), parameters=(), flags=0):
        """Execute a prepared MySQL statement"""
        parameters = list(parameters)
        long_data_used = {}

        if data:
            for param_id, _ in enumerate(parameters):
                if isinstance(data[param_id], IOBase):
                    binary = True
                    try:
                        binary = 'b' not in data[param_id].mode
                    except AttributeError:
                        pass
                    self.cmd_stmt_send_long_data(statement_id, param_id,
                                                 data[param_id])
                    long_data_used[param_id] = (binary,)

        execute_packet = self._protocol.make_stmt_execute(
            statement_id, data, tuple(parameters), flags,
            long_data_used, self.charset)
        packet = self._send_cmd(ServerCmd.STMT_EXECUTE, packet=execute_packet)
        result = self._handle_binary_result(packet)
        return result
recipe-576445.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def is_file(ob):
    return isinstance(ob, io.IOBase)
bhtsne.py 文件源码 项目:mbin 作者: fanglab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _is_filelike_object(f):
    try:
        return isinstance(f, (file, io.IOBase))
    except NameError:
        # 'file' is not a class in python3
        return isinstance(f, io.IOBase)
tabulate.py 文件源码 项目:synergy-service 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _is_file(f):
        return isinstance(f, io.IOBase)
client.py 文件源码 项目:aiosparql 作者: aio-libs 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def put(self, data: Union[bytes, IOBase], *, format: str,
                  graph: Optional[IRI] = None):
        async with self._crud_request("PUT", graph=graph, data=data,
                                      content_type=format) as resp:
            resp.raise_for_status()
client.py 文件源码 项目:aiosparql 作者: aio-libs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def post(self, data: Union[bytes, IOBase], *, format: str,
                   graph: Optional[IRI] = None):
        async with self._crud_request("POST", graph=graph, data=data,
                                      content_type=format) as resp:
            resp.raise_for_status()


问题


面经


文章

微信
公众号

扫码关注公众号