python类reported_process_loaded()的实例源码

visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break(skip_thread = self)
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def command_resume_thread(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        if thread.reported_process_loaded:
            thread.reported_process_loaded = False
            self.command_resume_all()
        else:
            thread.unblock()
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:pythonVSCode 作者: DonJayamanne 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break(skip_thread = self)
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:HomeAutomation 作者: gs2671 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:HomeAutomation 作者: gs2671 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break(skip_thread = self)
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:HomeAutomation 作者: gs2671 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def command_resume_thread(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        if thread.reported_process_loaded:
            thread.reported_process_loaded = False
            self.command_resume_all()
        else:
            thread.unblock()
visualstudio_py_debugger.py 文件源码 项目:AutoDiff 作者: icewall 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break()
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:AutoDiff 作者: icewall 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def command_resume_thread(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        if thread.reported_process_loaded:
            thread.reported_process_loaded = False
            self.command_resume_all()
        else:
            thread.unblock()
visualstudio_py_debugger.py 文件源码 项目:xidian-sfweb 作者: Gear420 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:xidian-sfweb 作者: Gear420 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break(skip_thread = self)
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:xidian-sfweb 作者: Gear420 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def command_resume_thread(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        if thread.reported_process_loaded:
            thread.reported_process_loaded = False
            self.command_resume_all()
        else:
            thread.unblock()
visualstudio_py_debugger.py 文件源码 项目:skojjt 作者: martin-green 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:skojjt 作者: martin-green 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break(skip_thread = self)
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:skojjt 作者: martin-green 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def command_resume_thread(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        if thread.reported_process_loaded:
            thread.reported_process_loaded = False
            self.command_resume_all()
        else:
            thread.unblock()
visualstudio_py_debugger.py 文件源码 项目:DjangoWebProject 作者: wrkettlitz 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:DjangoWebProject 作者: wrkettlitz 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break(skip_thread = self)
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:DjangoWebProject 作者: wrkettlitz 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def command_resume_thread(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        if thread.reported_process_loaded:
            thread.reported_process_loaded = False
            self.command_resume_all()
        else:
            thread.unblock()
visualstudio_py_debugger.py 文件源码 项目:ApiRestPythonTest 作者: rvfvazquez 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_debugger.py 文件源码 项目:ApiRestPythonTest 作者: rvfvazquez 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def block_maybe_attach(self):
        will_block_now = True
        if self.stepping == STEPPING_ATTACH_BREAK:
            # only one thread should send the attach break in
            attach_lock.acquire()
            global attach_sent_break
            if attach_sent_break:
                will_block_now = False
            attach_sent_break = True
            attach_lock.release()

        probe_stack()
        stepping = self.stepping
        self.stepping = STEPPING_NONE
        def block_cond():
            if will_block_now:
                if stepping == STEPPING_OVER or stepping == STEPPING_INTO:
                    report_step_finished(self.id)
                    return mark_all_threads_for_break(skip_thread = self)
                else:
                    if not DETACHED:
                        if stepping == STEPPING_ATTACH_BREAK:
                            self.reported_process_loaded = True
                        return report_process_loaded(self.id)
        update_all_thread_stacks(self)
        self.block(block_cond)
visualstudio_py_debugger.py 文件源码 项目:ApiRestPythonTest 作者: rvfvazquez 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def command_resume_thread(self):
        tid = read_int(self.conn)
        THREADS_LOCK.acquire()
        thread = THREADS[tid]
        THREADS_LOCK.release()

        if thread.reported_process_loaded:
            thread.reported_process_loaded = False
            self.command_resume_all()
        else:
            thread.unblock()
visualstudio_py_debugger.py 文件源码 项目:AutoDiff 作者: icewall 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None

        # stackless changes
        if stackless is not None:
            stackless.set_schedule_callback(self.context_dispatcher)
            # the tasklets need to be traced on a case by case basis
            # sys.trace needs to be called within their calling context
            def __call__(tsk, *args, **kwargs):
                f = tsk.tempval
                def new_f(old_f, args, kwargs):
                    sys.settrace(self.trace_func)
                    try:
                        if old_f is not None:
                            return old_f(*args, **kwargs)
                    finally:
                        sys.settrace(None)

                tsk.tempval = new_f
                stackless.tasklet.setup(tsk, f, args, kwargs)
                return tsk

            def settrace(tsk, tb):
                if hasattr(tsk.frame, "f_trace"):
                    tsk.frame.f_trace = tb
                sys.settrace(tb)

            self.__oldstacklesscall__ = stackless.tasklet.__call__
            stackless.tasklet.settrace = settrace
            stackless.tasklet.__call__ = __call__
        if sys.platform == 'cli':
            self.frames = []


问题


面经


文章

微信
公众号

扫码关注公众号