python类get_ident()的实例源码

reprlib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def recursive_repr(fillvalue='...'):
    'Decorator to make a repr function return fillvalue for a recursive call'

    def decorating_function(user_function):
        repr_running = set()

        def wrapper(self):
            key = id(self), get_ident()
            if key in repr_running:
                return fillvalue
            repr_running.add(key)
            try:
                result = user_function(self)
            finally:
                repr_running.discard(key)
            return result

        # Can't use functools.wraps() here because of bootstrap issues
        wrapper.__module__ = getattr(user_function, '__module__')
        wrapper.__doc__ = getattr(user_function, '__doc__')
        wrapper.__name__ = getattr(user_function, '__name__')
        wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
        return wrapper

    return decorating_function
hub.py 文件源码 项目:Flask-SocketIO 作者: cutedogspark 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, loop=None, default=None):
        greenlet.__init__(self)
        if hasattr(loop, 'run'):
            if default is not None:
                raise TypeError("Unexpected argument: default")
            self.loop = loop
        else:
            if default is None and get_ident() != MAIN_THREAD:
                default = False
            loop_class = _import(self.loop_class)
            if loop is None:
                loop = self.backend
            self.loop = loop_class(flags=loop, default=default)
        self._resolver = None
        self._threadpool = None
        self.format_context = _import(self.format_context)
reprlib.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def recursive_repr(fillvalue='...'):
    'Decorator to make a repr function return fillvalue for a recursive call'

    def decorating_function(user_function):
        repr_running = set()

        def wrapper(self):
            key = id(self), get_ident()
            if key in repr_running:
                return fillvalue
            repr_running.add(key)
            try:
                result = user_function(self)
            finally:
                repr_running.discard(key)
            return result

        # Can't use functools.wraps() here because of bootstrap issues
        wrapper.__module__ = getattr(user_function, '__module__')
        wrapper.__doc__ = getattr(user_function, '__doc__')
        wrapper.__name__ = getattr(user_function, '__name__')
        wrapper.__qualname__ = getattr(user_function, '__qualname__')
        wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
        return wrapper

    return decorating_function
_dummy_thread.py 文件源码 项目:python- 作者: secondtonone1 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def get_ident():
    """Dummy implementation of _thread.get_ident().

    Since this module should only be used when _threadmodule is not
    available, it is safe to assume that the current process is the
    only thread.  Thus a constant can be safely returned.
    """
    return -1
local.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self):
        object.__setattr__(self, '__storage__', {})
        object.__setattr__(self, '__ident_func__', get_ident)
local.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def get_ident(self):
        """Return the context identifier the local objects use internally for
        this context.  You cannot override this method to change the behavior
        but use it to link other context local objects (such as SQLAlchemy's
        scoped sessions) to the Werkzeug locals.

        .. versionchanged:: 0.7
           You can pass a different ident function to the local manager that
           will then be propagated to all the locals passed to the
           constructor.
        """
        return self.ident_func()
local.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self):
        object.__setattr__(self, '__storage__', {})
        object.__setattr__(self, '__ident_func__', get_ident)
local.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def get_ident(self):
        """Return the context identifier the local objects use internally for
        this context.  You cannot override this method to change the behavior
        but use it to link other context local objects (such as SQLAlchemy's
        scoped sessions) to the Werkzeug locals.

        .. versionchanged:: 0.7
           You can pass a different ident function to the local manager that
           will then be propagated to all the locals passed to the
           constructor.
        """
        return self.ident_func()
local.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self):
        object.__setattr__(self, '__storage__', {})
        object.__setattr__(self, '__ident_func__', get_ident)
local.py 文件源码 项目:zanph 作者: zanph 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_ident(self):
        """Return the context identifier the local objects use internally for
        this context.  You cannot override this method to change the behavior
        but use it to link other context local objects (such as SQLAlchemy's
        scoped sessions) to the Werkzeug locals.

        .. versionchanged:: 0.7
           You can pass a different ident function to the local manager that
           will then be propagated to all the locals passed to the
           constructor.
        """
        return self.ident_func()
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 63 收藏 0 点赞 0 评论 0
def __enter__(self):
        # mark that we're about to do socket I/O so we won't deliver
        # debug events when we're debugging the standard library
        cur_thread = get_thread_from_id(thread.get_ident())
        if cur_thread is not None:
            cur_thread.is_sending = True

        send_lock.acquire()
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, tb):
        send_lock.release()

        # start sending debug events again
        cur_thread = get_thread_from_id(thread.get_ident())
        if cur_thread is not None:
            cur_thread.is_sending = False

        if exc_type is not None:
            detach_threads()
            detach_process()
            # swallow the exception, we're no longer debugging
            return True
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def block(self, block_lambda, keep_stopped_on_line = False):
        """blocks the current thread until the debugger resumes it"""
        assert not self._is_blocked
        #assert self.id == thread.get_ident(), 'wrong thread identity' + str(self.id) + ' ' + str(thread.get_ident())    # we should only ever block ourselves

        # send thread frames before we block
        self.enum_thread_frames_locally()

        if not keep_stopped_on_line:
            self.stopped_on_line = self.cur_frame.f_lineno

        # need to synchronize w/ sending the reason we're blocking
        self._block_starting_lock.acquire()
        self._is_blocked = True
        block_lambda()
        self._block_starting_lock.release()

        while not DETACHED:
            self._block_lock.acquire()
            if self.unblock_work is None:
                break

            # the debugger wants us to do something, do it, and then block again
            self._is_working = True
            self.unblock_work()
            self.unblock_work = None
            self._is_working = False

        self._block_starting_lock.acquire()
        assert self._is_blocked
        self._is_blocked = False
        self._block_starting_lock.release()
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def unblock(self):
        """unblocks the current thread allowing it to continue to run"""
        assert self._is_blocked 
        assert self.id != thread.get_ident()    # only someone else should unblock us

        self._block_lock.release()
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write(self, data):
        if not DETACHED:
            probe_stack(3)
            str_data = utf_8.decode(data)[0]
            with _SendLockCtx:
                write_bytes(conn, OUTP)
                write_int(conn, thread.get_ident())
                write_string(conn, str_data)
        self.buffer.write(data)
attach_server.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def break_into_debugger():
    """If a PTVS remote debugger is attached, pauses execution of all threads,
    and breaks into the debugger with current thread as active.
    """
    if not vspd.DETACHED:
        vspd.SEND_BREAK_COMPLETE = thread.get_ident()
        vspd.mark_all_threads_for_break()
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __enter__(self):
        # mark that we're about to do socket I/O so we won't deliver
        # debug events when we're debugging the standard library
        cur_thread = get_thread_from_id(thread.get_ident())
        if cur_thread is not None:
            cur_thread.is_sending = True

        send_lock.acquire()
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, tb):
        send_lock.release()

        # start sending debug events again
        cur_thread = get_thread_from_id(thread.get_ident())
        if cur_thread is not None:
            cur_thread.is_sending = False

        if exc_type is not None:
            detach_threads()
            detach_process()
            # swallow the exception, we're no longer debugging
            return True
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []


问题


面经


文章

微信
公众号

扫码关注公众号