btask.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:utils 作者: jianbing 项目源码 文件源码
def stop(cls, task_name):
        if task_name not in cls._tasks:
            raise Exception("task {} is not exists".format(task_name))
        tid = cls._tasks[task_name]
        exc_type = SystemExit
        tid = ctypes.c_long(tid)
        res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exc_type))
        if res == 0:
            raise ValueError("invalid thread id?the task may be already stop")
        elif res != 1:
            # """if it returns a number greater than one, you're in trouble,
            # and you should call it again with exc=NULL to revert the effect"""
            ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
            raise SystemError("PyThreadState_SetAsyncExc failed")
        del cls._tasks[task_name]
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号