python类close()的实例源码

_fileobjectposix.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
_fileobjectposix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
_fileobjectposix.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def _do_close(self, io, closefd):
        try:
            io.close()
            # self.fileio already knows whether or not to close the
            # file descriptor
            self.fileio.close()
        finally:
            self._fobj = None
            self.fileio = None
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()
test_requests.py 文件源码 项目:ruffruffs 作者: di 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_session_close_proxy_clear(self, mocker):
        proxies = {
          'one': mocker.Mock(),
          'two': mocker.Mock(),
        }
        session = requests.Session()
        mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
        session.close()
        proxies['one'].clear.assert_called_once_with()
        proxies['two'].clear.assert_called_once_with()
test_requests.py 文件源码 项目:filegardener 作者: smorin 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()
test_requests.py 文件源码 项目:filegardener 作者: smorin 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def test_session_close_proxy_clear(self, mocker):
        proxies = {
          'one': mocker.Mock(),
          'two': mocker.Mock(),
        }
        session = requests.Session()
        mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
        session.close()
        proxies['one'].clear.assert_called_once_with()
        proxies['two'].clear.assert_called_once_with()
test_requests.py 文件源码 项目:filegardener 作者: smorin 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()
test_requests.py 文件源码 项目:filegardener 作者: smorin 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def test_session_close_proxy_clear(self, mocker):
        proxies = {
          'one': mocker.Mock(),
          'two': mocker.Mock(),
        }
        session = requests.Session()
        mocker.patch.dict(session.adapters['http://'].proxy_manager, proxies)
        session.close()
        proxies['one'].clear.assert_called_once_with()
        proxies['two'].clear.assert_called_once_with()
_fileobjectposix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def _do_close(self, io, closefd):
        try:
            io.close()
            # self.fileio already knows whether or not to close the
            # file descriptor
            self.fileio.close()
        finally:
            self._fobj = None
            self.fileio = None
test_requests.py 文件源码 项目:Codeforces-Sublime-Plugin 作者: karunk 项目源码 文件源码 阅读 87 收藏 0 点赞 0 评论 0
def test_response_is_iterable(self):
        r = requests.Response()
        io = StringIO.StringIO('abc')
        read_ = io.read

        def read_mock(amt, decode_content=None):
            return read_(amt)
        setattr(io, 'read', read_mock)
        r.raw = io
        assert next(iter(r))
        io.close()


问题


面经


文章

微信
公众号

扫码关注公众号