_signals.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:trio 作者: python-trio 项目源码 文件源码
def catch_signals(signals):
    """A context manager for catching signals.

    Entering this context manager starts listening for the given signals and
    returns an async iterator; exiting the context manager stops listening.

    The async iterator blocks until at least one signal has arrived, and then
    yields a :class:`set` containing all of the signals that were received
    since the last iteration.

    Note that if you leave the ``with`` block while the iterator has
    unextracted signals still pending inside it, then they will be
    re-delivered using Python's regular signal handling logic. This avoids a
    race condition when signals arrives just before we exit the ``with``
    block.

    Args:
      signals: a set of signals to listen for.

    Raises:
      RuntimeError: if you try to use this anywhere except Python's main
          thread. (This is a Python limitation.)

    Example:

      A common convention for Unix daemons is that they should reload their
      configuration when they receive a ``SIGHUP``. Here's a sketch of what
      that might look like using :func:`catch_signals`::

         with trio.catch_signals({signal.SIGHUP}) as batched_signal_aiter:
             async for batch in batched_signal_aiter:
                 # We're only listening for one signal, so the batch is always
                 # {signal.SIGHUP}, but if we were listening to more signals
                 # then it could vary.
                 for signum in batch:
                     assert signum == signal.SIGHUP
                     reload_configuration()

    """
    if threading.current_thread() != threading.main_thread():
        raise RuntimeError(
            "Sorry, catch_signals is only possible when running in the "
            "Python interpreter's main thread"
        )
    token = _core.current_trio_token()
    queue = SignalQueue()

    def handler(signum, _):
        token.run_sync_soon(queue._add, signum, idempotent=True)

    try:
        with _signal_handler(signals, handler):
            yield queue
    finally:
        queue._redeliver_remaining()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号