python类contextmanager()的实例源码

test_weakref.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_weak_keys_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_keyed_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
        dict, objects = self.make_weak_keyed_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                v = objects.pop().arg
                gc.collect()      # just in case
                yield Object(v), v
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
test_weakref.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_weak_values_destroy_while_iterating(self):
        # Issue #7105: iterators shouldn't crash when a key is implicitly removed
        dict, objects = self.make_weak_valued_dict()
        self.check_weak_destroy_while_iterating(dict, objects, 'keys')
        self.check_weak_destroy_while_iterating(dict, objects, 'items')
        self.check_weak_destroy_while_iterating(dict, objects, 'values')
        self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
        self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
        dict, objects = self.make_weak_valued_dict()
        @contextlib.contextmanager
        def testcontext():
            try:
                it = iter(dict.items())
                next(it)
                # Schedule a key/value for removal and recreate it
                k = objects.pop().arg
                gc.collect()      # just in case
                yield k, Object(k)
            finally:
                it = None           # should commit all removals
                gc.collect()
        self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
unpack_kdbg_kit.py 文件源码 项目:rvmi-rekall 作者: fireeye 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run_xz_decompress(stdout, stderr=None):
    """Run xz --decompress and return a contextmanager object for the proc."""
    xz = None
    try:
        xz = subprocess.Popen(["xz", "--decompress", "--stdout"],
                              stdin=subprocess.PIPE, stdout=stdout,
                              stderr=stderr)
        yield xz
    finally:
        if not xz:
            raise OSError("You must have an 'xz' binary in PATH.")

        xz.stdin.close()
        result = xz.wait()

        if result != 0:
            raise IOError("xz --decompress returned %d." % result)
contexttools.py 文件源码 项目:kur 作者: deepgram 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def redirect_stderr(x):
    """ Redirects stderr to another file-like object.

        This is some compatibility code to support Python 3.4.
    """
    if hasattr(contextlib, 'redirect_stderr'):
        result = contextlib.redirect_stderr
    else:
        @contextlib.contextmanager
        def result(x):
            """ Stand-in for Python 3.5's `redirect_stderr`.

                Notes: Non-reentrant, non-threadsafe
            """
            old_stderr = sys.stderr
            sys.stderr = x
            yield
            sys.stder = old_stderr

    return result(x)

###############################################################################
common.py 文件源码 项目:BigBrotherBot-For-UrT43 作者: ptitbigorneau 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def query2(self, query, bindata=None):
        """
        Alias for query method which make use of the python 'with' statement.
        The contextmanager approach ensures that the generated cursor object instance
        is always closed whenever the execution goes out of the 'with' statement block.
        Example:

        >> with self.console.storage.query(query) as cursor:
        >>     if not cursor.EOF:
        >>         cursor.getRow()
        >>         ...

        :param query: The query to execute.
        :param bindata: Data to bind to the given query.
        """
        cursor = None
        try:
            cursor = self.query(query, bindata)
            yield cursor
        finally:
            if cursor:
                cursor.close()
config_revamp.py 文件源码 项目:mysql_streamer 作者: Yelp 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def reconfigure(ns=namespace, **kwargs):
    """Reconfigures the given kwargs, restoring the current configuration for
    only those kwargs when the contextmanager exits.

    Args:
        ns: Namespace of the conf
    """
    conf_namespace = staticconf.config.get_namespace(ns)
    starting_config = {
        k: v for k, v in conf_namespace.get_config_values().iteritems()
        if k in kwargs
    }
    staticconf.DictConfiguration(kwargs, namespace=ns)
    try:
        yield
    finally:
        final_config = {
            k: v for k, v in conf_namespace.get_config_values().iteritems()
            if k not in kwargs
        }
        final_config.update(starting_config)
        staticconf.config.get_namespace(ns).clear()
        staticconf.DictConfiguration(final_config, namespace=ns)
parse_replication_stream_internal.py 文件源码 项目:mysql_streamer 作者: Yelp 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _register_signal_handlers(self):
        """Register the handler SIGUSR2, which will toggle a profiler on and off.
        """
        try:
            signal.signal(signal.SIGINT, self._handle_shutdown_signal)
            signal.signal(signal.SIGTERM, self._handle_shutdown_signal)
            signal.signal(signal.SIGUSR2, self._handle_profiler_signal)
            yield
        finally:
            # Cleanup for the profiler signal handler has to happen here,
            # because signals that are handled don't unwind up the stack in the
            # way that normal methods do.  Any contextmanager or finally
            # statement won't live past the handler function returning.
            signal.signal(signal.SIGUSR2, signal.SIG_DFL)
            if self._profiler_running:
                self._disable_profiler()
test_driver.py 文件源码 项目:Trusted-Platform-Module-nova 作者: BU-NU-CLOUD-SP16 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_create_without_pause(self):
        self.flags(virt_type='lxc', group='libvirt')

        @contextlib.contextmanager
        def fake_lxc_disk_handler(*args, **kwargs):
            yield

        drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
        instance = objects.Instance(**self.test_instance)

        with test.nested(
              mock.patch.object(drvr, '_lxc_disk_handler',
                                side_effect=fake_lxc_disk_handler),
              mock.patch.object(drvr, 'plug_vifs'),
              mock.patch.object(drvr, 'firewall_driver'),
              mock.patch.object(drvr, '_create_domain'),
              mock.patch.object(drvr, 'cleanup')) as (
              _handler, cleanup, firewall_driver, create, plug_vifs):
            domain = drvr._create_domain_and_network(self.context, 'xml',
                                                     instance, None, None)
            self.assertEqual(0, create.call_args_list[0][1]['pause'])
            self.assertEqual(0, domain.resume.call_count)
test_contextlib2.py 文件源码 项目:contextlib2 作者: jazzband 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_contextmanager_finally(self):
        state = []
        @contextmanager
        def woohoo():
            state.append(1)
            try:
                yield 42
            finally:
                state.append(999)
        with self.assertRaises(ZeroDivisionError):
            with woohoo() as x:
                self.assertEqual(state, [1])
                self.assertEqual(x, 42)
                state.append(x)
                raise ZeroDivisionError()
        self.assertEqual(state, [1, 42, 999])
test_contextlib2.py 文件源码 项目:contextlib2 作者: jazzband 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_contextmanager_except(self):
        state = []
        @contextmanager
        def woohoo():
            state.append(1)
            try:
                yield 42
            except ZeroDivisionError as e:
                state.append(e.args[0])
                self.assertEqual(state, [1, 42, 999])
        with woohoo() as x:
            self.assertEqual(state, [1])
            self.assertEqual(x, 42)
            state.append(x)
            raise ZeroDivisionError(999)
        self.assertEqual(state, [1, 42, 999])
test_contextlib2.py 文件源码 项目:contextlib2 作者: jazzband 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_contextmanager_except_pep479(self):
        code = """\
from __future__ import generator_stop
from contextlib import contextmanager
@contextmanager
def woohoo():
    yield
"""
        locals = {}
        exec(code, locals, locals)
        woohoo = locals['woohoo']

        stop_exc = StopIteration('spam')
        try:
            with woohoo():
                raise stop_exc
        except Exception as ex:
            self.assertIs(ex, stop_exc)
        else:
            self.fail('StopIteration was suppressed')
test_contextlib2.py 文件源码 项目:contextlib2 作者: jazzband 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_contextmanager_as_decorator(self):
        @contextmanager
        def woohoo(y):
            state.append(y)
            yield
            state.append(999)

        state = []
        @woohoo(1)
        def test(x):
            self.assertEqual(state, [1])
            state.append(x)
        test('something')
        self.assertEqual(state, [1, 'something', 999])

        # Issue #11647: Ensure the decorated function is 'reusable'
        state = []
        test('something else')
        self.assertEqual(state, [1, 'something else', 999])

# Detailed exception chaining checks only make sense on Python 3
traitlets.py 文件源码 项目:yatta_reader 作者: sound88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def cross_validation_lock(self):
        """
        A contextmanager for running a block with our cross validation lock set
        to True.

        At the end of the block, the lock's value is restored to its value
        prior to entering the block.
        """
        if self._cross_validation_lock:
            yield
            return
        else:
            try:
                self._cross_validation_lock = True
                yield
            finally:
                self._cross_validation_lock = False
test_helpers.py 文件源码 项目:Hawkeye 作者: tozhengxq 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def setup_with_context_manager(testcase, cm):
    """Use a contextmanager to setUp a test case.

    If you have a context manager you like::

        with ctxmgr(a, b, c) as v:
            # do something with v

    and you want to have that effect for a test case, call this function from
    your setUp, and it will start the context manager for your test, and end it
    when the test is done::

        def setUp(self):
            self.v = setup_with_context_manager(self, ctxmgr(a, b, c))

        def test_foo(self):
            # do something with self.v

    """
    val = cm.__enter__()
    testcase.addCleanup(cm.__exit__, None, None, None)
    return val
_utilities.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def contextmanager(fn):
        def oops(*args, **kw):
            raise RuntimeError("Python 2.5 or above is required to use "
                               "context managers.")
        oops.__name__ = fn.__name__
        return oops
recipe-498259.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def observation(observe, notify):
    """Simple boilerplate to link to the 'with' statement.

    Contextlib's contextmanager decorator is a very convenient way to
    create simple context managers, specifically the __enter__ and
    __exit__ special methods.
    """

    proxy = Observation(observe, notify)
    try:
        yield proxy
    finally:
        proxy.close()
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def named_context(self, name):
        @contextlib.contextmanager
        def context():
            self.named_contexts.append(name)
            try:
                yield
            finally:
                self.assertEqual(self.named_contexts.pop(), name)
        return context
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def named_context(self, name):
        @contextlib.contextmanager
        def context():
            self.named_contexts.append(name)
            try:
                yield
            finally:
                self.assertEqual(self.named_contexts.pop(), name)
        return context
gen_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def named_context(self, name):
        @contextlib.contextmanager
        def context():
            self.named_contexts.append(name)
            try:
                yield
            finally:
                self.assertEqual(self.named_contexts.pop(), name)
        return context
usb1.py 文件源码 项目:sc-controller 作者: kozec 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _validContext(func):
        # Defined inside USBContext so we can access "self.__*".
        @contextlib.contextmanager
        def refcount(self):
            with self.__context_cond:
                if not self.__context_p and self.__auto_open:
                    # BBB
                    warnings.warn(
                        'Use "with USBContext() as context:" for safer cleanup'
                        ' on interpreter shutdown. See also USBContext.open().',
                        DeprecationWarning,
                    )
                    self.open()
                self.__context_refcount += 1
            try:
                yield
            finally:
                with self.__context_cond:
                    self.__context_refcount -= 1
                    if not self.__context_refcount:
                        self.__context_cond.notifyAll()
        if inspect.isgeneratorfunction(func):
            def wrapper(self, *args, **kw):
                with refcount(self):
                    if self.__context_p:
                        # pylint: disable=not-callable
                        for value in func(self, *args, **kw):
                            # pylint: enable=not-callable
                            yield value
        else:
            def wrapper(self, *args, **kw):
                with refcount(self):
                    if self.__context_p:
                        # pylint: disable=not-callable
                        return func(self, *args, **kw)
                        # pylint: enable=not-callable
        functools.update_wrapper(wrapper, func)
        return wrapper
    # pylint: enable=no-self-argument,protected-access


问题


面经


文章

微信
公众号

扫码关注公众号