python类get_ident()的实例源码

graph.py 文件源码 项目:inferno 作者: inferno-pytorch 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __init__(self, graph=None):
        """
        Construct the graph object.

        Parameters
        ----------
            graph : networkx.DiGraph or NNGraph
                Graph to build the object from (optional).
        """
        super(Graph, self).__init__()
        # Privates
        self._thread_to_graph_mapping = {}
        self._creator_thread = threading.get_ident()
        self._creator_pid = mp.current_process().pid
        # Publics
        if graph is not None:
            self.graph = graph
        else:
            self.graph = NNGraph()
chainmap.py 文件源码 项目:deb-python-pint 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _recursive_repr(fillvalue='...'):
    'Decorator to make a repr function return fillvalue for a recursive call'

    def decorating_function(user_function):
        repr_running = set()

        def wrapper(self):
            key = id(self), get_ident()
            if key in repr_running:
                return fillvalue
            repr_running.add(key)
            try:
                result = user_function(self)
            finally:
                repr_running.discard(key)
            return result

        # Can't use functools.wraps() here because of bootstrap issues
        wrapper.__module__ = getattr(user_function, '__module__')
        wrapper.__doc__ = getattr(user_function, '__doc__')
        wrapper.__name__ = getattr(user_function, '__name__')
        wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
        return wrapper

    return decorating_function
common.py 文件源码 项目:scarlett_os 作者: bossjones 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def async_test_scarlett_os(loop):
    """Return a ScarlettOS object pointing at test config dir."""
    # loop._thread_ident = threading.get_ident()

    ss = s.ScarlettSystem(loop)

    ss.config.location_name = 'test scarlett'
    ss.config.config_dir = get_test_config_dir()
    ss.config.latitude = 32.87336
    ss.config.longitude = -117.22743
    ss.config.elevation = 0
    ss.config.time_zone = date_utility.get_time_zone('US/Pacific')
    ss.config.units = METRIC_SYSTEM
    ss.config.skip_pip = True

    # if 'custom_automations.test' not in loader.AVAILABLE_COMPONENTS:
    #     yield from loop.run_in_executor(None, loader.prepare, ss)

    ss.state = s.CoreState.running

    return ss
threaded.py 文件源码 项目:tasker 作者: wavenator 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def pre_work(
        self,
        task,
    ):
        self.update_current_task(
            task=task,
        )

        interval = self.worker_config['timeouts']['soft_timeout']
        if interval == 0:
            interval = None

        self.current_timers[threading.get_ident()] = threading.Timer(
            interval=interval,
            function=ctypes.pythonapi.PyThreadState_SetAsyncExc,
            args=(
                ctypes.c_long(threading.get_ident()),
                ctypes.py_object(worker.WorkerSoftTimedout),
            )
        )

        self.current_timers[threading.get_ident()].start()
__init__.py 文件源码 项目:midict 作者: ShenggaoZhu 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def __repr__(self, _repr_running={}):
        'repr as "MIDict(items, names)"'
        call_key = id(self), _get_ident()
        if call_key in _repr_running: # pragma: no cover
            return '<%s(...)>' % self.__class__.__name__
        _repr_running[call_key] = 1
        try:
            try:
                if self.indices:
                    names = force_list(self.indices.keys())
                    items = force_list(self.items())
                    return '%s(%s, %s)' % (self.__class__.__name__, items, names)
            except AttributeError: # pragma: no cover
                # may not have attr ``indices`` yet
                pass
            return '%s()' % self.__class__.__name__
        finally:
            del _repr_running[call_key]
decorators.py 文件源码 项目:web3.py 作者: ethereum 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def reject_recursive_repeats(to_wrap):
    '''
    Prevent simple cycles by returning None when called recursively with same instance
    '''
    to_wrap.__already_called = {}

    @functools.wraps(to_wrap)
    def wrapped(*args):
        arg_instances = tuple(map(id, args))
        thread_id = threading.get_ident()
        thread_local_args = (thread_id,) + arg_instances
        if thread_local_args in to_wrap.__already_called:
            raise ValueError('Recursively called %s with %r' % (to_wrap, args))
        to_wrap.__already_called[thread_local_args] = True
        try:
            wrapped_val = to_wrap(*args)
        finally:
            del to_wrap.__already_called[thread_local_args]
        return wrapped_val
    return wrapped
test_signal.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def test_pthread_kill_main_thread(self):
        # Test that a signal can be sent to the main thread with pthread_kill()
        # before any other thread has been created (see issue #12392).
        code = """if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        """

        with spawn_python('-c', code) as process:
            stdout, stderr = process.communicate()
            exitcode = process.wait()
            if exitcode != 3:
                raise Exception("Child error (exit code %s): %s" %
                                (exitcode, stdout))
lock_tests.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, f, n, wait_before_exit=False):
        """
        Construct a bunch of `n` threads running the same function `f`.
        If `wait_before_exit` is True, the threads won't terminate until
        do_finish() is called.
        """
        self.f = f
        self.n = n
        self.started = []
        self.finished = []
        self._can_exit = not wait_before_exit
        def task():
            tid = threading.get_ident()
            self.started.append(tid)
            try:
                f()
            finally:
                self.finished.append(tid)
                while not self._can_exit:
                    _wait()
        for i in range(n):
            start_new_thread(task, ())
test_threaded_import.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def task(N, done, done_tasks, errors):
    try:
        # We don't use modulefinder but still import it in order to stress
        # importing of different modules from several threads.
        if len(done_tasks) % 2:
            import modulefinder
            import random
        else:
            import random
            import modulefinder
        # This will fail if random is not completely initialized
        x = random.randrange(1, 3)
    except Exception as e:
        errors.append(e.with_traceback(None))
    finally:
        done_tasks.append(threading.get_ident())
        finished = len(done_tasks) == N
        if finished:
            done.set()

# Create a circular import structure: A -> C -> B -> D -> A
# NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
test_capi.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_thread_state(self):
        # some extra thread-state tests driven via _testcapi
        def target():
            idents = []

            def callback():
                idents.append(threading.get_ident())

            _testcapi._test_thread_state(callback)
            a = b = callback
            time.sleep(1)
            # Check our main thread is in the list exactly 3 times.
            self.assertEqual(idents.count(threading.get_ident()), 3,
                             "Couldn't find main thread correctly in the list")

        target()
        t = threading.Thread(target=target)
        t.start()
        t.join()
base_events.py 文件源码 项目:golightan 作者: shirou 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def _check_thread(self):
        """Check that the current thread is the thread running the event loop.

        Non-thread-safe methods of this class make this assumption and will
        likely behave incorrectly when the assumption is violated.

        Should only be called when (self._debug == True).  The caller is
        responsible for checking this condition for performance reasons.
        """
        if self._thread_id is None:
            return
        thread_id = threading.get_ident()
        if thread_id != self._thread_id:
            raise RuntimeError(
                "Non-thread-safe operation invoked on an event loop other "
                "than the current one")
patcher.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _green_existing_locks():
    """Make locks created before monkey-patching safe.

    RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
    blocks the native thread. We need to replace these with green Locks.

    This was originally noticed in the stdlib logging module."""
    import gc
    import threading
    import eventlet.green.thread
    lock_type = type(threading.Lock())
    rlock_type = type(threading.RLock())
    if sys.version_info[0] >= 3:
        pyrlock_type = type(threading._PyRLock())
    # We're monkey-patching so there can't be any greenlets yet, ergo our thread
    # ID is the only valid owner possible.
    tid = eventlet.green.thread.get_ident()
    for obj in gc.get_objects():
        if isinstance(obj, rlock_type):
            if (sys.version_info[0] == 2 and
                    isinstance(obj._RLock__block, lock_type)):
                _fix_py2_rlock(obj, tid)
            elif (sys.version_info[0] >= 3 and
                    not isinstance(obj, pyrlock_type)):
                _fix_py3_rlock(obj)
app.py 文件源码 项目:data-store 作者: HumanCellAtlas 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def timeout_response() -> chalice.Response:
    """
    Produce a chalice Response object that indicates a timeout.  Stacktraces for all running threads, other than the
    current thread, are provided in the response object.
    """
    frames = sys._current_frames()
    current_threadid = threading.get_ident()
    trace_dump = {
        thread_id: traceback.format_stack(frame)
        for thread_id, frame in frames.items()
        if thread_id != current_threadid}

    problem = {
        'status': requests.codes.gateway_timeout,
        'code': "timed_out",
        'title': "Timed out processing request.",
        'traces': trace_dump,
    }
    return chalice.Response(
        status_code=problem['status'],
        headers={"Content-Type": "application/problem+json"},
        body=json.dumps(problem),
    )
test_connection.py 文件源码 项目:pymysqlpool 作者: 0xE8551CCB 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_with_multi_threading():
    test_truncate()

    def task(n):
        print('In thread {}'.format(threading.get_ident()))
        for _ in range(n):
            test_insert_one()

    threads = [threading.Thread(target=task, args=(100,)) for _ in range(50)]
    for t in threads:
        t.start()

    for t in threads:
        t.join()

    test_query()
base_events.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def run_forever(self):
        """Run until stop() is called."""
        self._check_closed()
        if self.is_running():
            raise RuntimeError('Event loop is running.')
        self._set_coroutine_wrapper(self._debug)
        self._thread_id = threading.get_ident()
        try:
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._stopping = False
            self._thread_id = None
            self._set_coroutine_wrapper(False)
base_events.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _check_thread(self):
        """Check that the current thread is the thread running the event loop.

        Non-thread-safe methods of this class make this assumption and will
        likely behave incorrectly when the assumption is violated.

        Should only be called when (self._debug == True).  The caller is
        responsible for checking this condition for performance reasons.
        """
        if self._thread_id is None:
            return
        thread_id = threading.get_ident()
        if thread_id != self._thread_id:
            raise RuntimeError(
                "Non-thread-safe operation invoked on an event loop other "
                "than the current one")
test_signal.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_pthread_kill_main_thread(self):
        # Test that a signal can be sent to the main thread with pthread_kill()
        # before any other thread has been created (see issue #12392).
        code = """if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        """

        with spawn_python('-c', code) as process:
            stdout, stderr = process.communicate()
            exitcode = process.wait()
            if exitcode != 3:
                raise Exception("Child error (exit code %s): %s" %
                                (exitcode, stdout))
test_capi.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def test_thread_state(self):
        # some extra thread-state tests driven via _testcapi
        def target():
            idents = []

            def callback():
                idents.append(threading.get_ident())

            _testcapi._test_thread_state(callback)
            a = b = callback
            time.sleep(1)
            # Check our main thread is in the list exactly 3 times.
            self.assertEqual(idents.count(threading.get_ident()), 3,
                             "Couldn't find main thread correctly in the list")

        target()
        t = threading.Thread(target=target)
        t.start()
        t.join()
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_main_thread_after_fork(self):
        code = """if 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_main_thread_after_fork_from_nonmain_thread(self):
        code = """if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
client.py 文件源码 项目:Pyro5 作者: irmen 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def __init__(self, uri):
        if isinstance(uri, str):
            uri = core.URI(uri)
        elif not isinstance(uri, core.URI):
            raise TypeError("expected Pyro URI")
        self._pyroUri = uri
        self._pyroConnection = None
        self._pyroSerializer = None  # can be set to the name of a serializer to override the global one per-proxy
        self._pyroMethods = set()  # all methods of the remote object, gotten from meta-data
        self._pyroAttrs = set()  # attributes of the remote object, gotten from meta-data
        self._pyroOneway = set()  # oneway-methods of the remote object, gotten from meta-data
        self._pyroSeq = 0  # message sequence number
        self._pyroRawWireResponse = False  # internal switch to enable wire level responses
        self._pyroHandshake = "hello"  # the data object that should be sent in the initial connection handshake message
        self._pyroMaxRetries = config.MAX_RETRIES
        self.__pyroTimeout = config.COMMTIMEOUT
        self.__pyroOwnerThread = get_ident()     # the thread that owns this proxy
        if config.SERIALIZER not in serializers.serializers:
            raise ValueError("unknown serializer configured")
        core.current_context.annotations = {}
        core.current_context.response_annotations = {}
core.py 文件源码 项目:asgi_rabbitmq 作者: proofit404 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def schedule(self, f, *args, **kwargs):
        """
        Try to acquire connection access lock.  Then call protocol method.
        Return concurrent Future instance you can wait in the other
        thread.
        """

        self.wait_open()
        # RabbitMQ operations are multiplexed between different AMQP
        # method callbacks.  Final result of the protocol method call
        # will be set inside one of this callbacks.  So other thread
        # will be able to wait unless this event happens in the
        # connection event loop.
        future = Future()
        with self.lock:
            self.process(get_ident(), (f, args, kwargs), future)
        return future
tracing.py 文件源码 项目:nodepy 作者: nodepy 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def format_html(fp, exclude=()):
  frames = stackframes()
  fp.write('<!DOCTYPE html>\n')
  fp.write('<html><head><title>{} Traces</title></head><body>\n'.format(len(frames)))
  for thread_id, stack in sorted(frames.items(), key=lambda x: x[0]):
    name = 'Thread {}'.format(thread_id)
    if thread_id == threading.get_ident():
      name += ' (tracing thread)'
    elif thread_id == main_thread.ident:
      name += ' (main)'
    fp.write('<h3>{}</h3>\n'.format(name))
    tbstr = format_stack(stack)
    if pygments:
      formatter = pygments.formatters.HtmlFormatter(full=False, noclasses=True)
      lexer = pygments.lexers.PythonLexer()
      tbstr = pygments.highlight(tbstr, lexer, formatter)
    fp.write(tbstr)
    fp.write('\n')
  fp.write('</body>\n')
test_signals.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_async():
    from threading import get_ident

    main = get_ident()

    @on_test.register(async=True)
    def a1():
        assert get_ident() != main

    @on_test.register()
    def a2():
        assert get_ident() == main

    on_test()

    on_test.async = True

    @on_test.register()  # follows the current setting
    def a3():
        assert get_ident() != main

    on_test()

    on_test.async = False
gevent.py 文件源码 项目:easypy 作者: weka-io 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, pool_size):
        assert not self.__class__.POOL, "Can't create more than one: %s" % self.__class__
        self.__class__.POOL = self
        self.active_jobs = set()
        self.pool_size = pool_size
        self.jobs_queue = Queue()

        def work():
            while True:
                func, name, job_id, parent_uuid = self.jobs_queue.get()
                _set_thread_uuid(threading.get_ident(), parent_uuid)
                _logger.debug('Starting job in real thread: %s', name or "<anonymous>")
                func()
                self.active_jobs.remove(job_id)
                _logger.debug('ready for the next job')

        for i in range(self.pool_size):
            name = 'real-thread-%s' % i
            thread = threading.Thread(target=work, name=name, daemon=True)
            thread.start()
test_signal.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_pthread_kill_main_thread(self):
        # Test that a signal can be sent to the main thread with pthread_kill()
        # before any other thread has been created (see issue #12392).
        code = """if True:
            import threading
            import signal
            import sys

            def handler(signum, frame):
                sys.exit(3)

            signal.signal(signal.SIGUSR1, handler)
            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
            sys.exit(2)
        """

        with spawn_python('-c', code) as process:
            stdout, stderr = process.communicate()
            exitcode = process.wait()
            if exitcode != 3:
                raise Exception("Child error (exit code %s): %s" %
                                (exitcode, stdout))
lock_tests.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def __init__(self, f, n, wait_before_exit=False):
        """
        Construct a bunch of `n` threads running the same function `f`.
        If `wait_before_exit` is True, the threads won't terminate until
        do_finish() is called.
        """
        self.f = f
        self.n = n
        self.started = []
        self.finished = []
        self._can_exit = not wait_before_exit
        def task():
            tid = threading.get_ident()
            self.started.append(tid)
            try:
                f()
            finally:
                self.finished.append(tid)
                while not self._can_exit:
                    _wait()
        for i in range(n):
            start_new_thread(task, ())
test_threaded_import.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def task(N, done, done_tasks, errors):
    try:
        # We don't use modulefinder but still import it in order to stress
        # importing of different modules from several threads.
        if len(done_tasks) % 2:
            import modulefinder
            import random
        else:
            import random
            import modulefinder
        # This will fail if random is not completely initialized
        x = random.randrange(1, 3)
    except Exception as e:
        errors.append(e.with_traceback(None))
    finally:
        done_tasks.append(threading.get_ident())
        finished = len(done_tasks) == N
        if finished:
            done.set()

# Create a circular import structure: A -> C -> B -> D -> A
# NOTE: `time` is already loaded and therefore doesn't threaten to deadlock.
test_capi.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_thread_state(self):
        # some extra thread-state tests driven via _testcapi
        def target():
            idents = []

            def callback():
                idents.append(threading.get_ident())

            _testcapi._test_thread_state(callback)
            a = b = callback
            time.sleep(1)
            # Check our main thread is in the list exactly 3 times.
            self.assertEqual(idents.count(threading.get_ident()), 3,
                             "Couldn't find main thread correctly in the list")

        target()
        t = threading.Thread(target=target)
        t.start()
        t.join()
test_threading.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_main_thread_after_fork(self):
        code = """if 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "MainThread\nTrue\nTrue\n")


问题


面经


文章

微信
公众号

扫码关注公众号