python类ETIME的实例源码

timeout.py 文件源码 项目:open-nti 作者: Juniper 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
common.py 文件源码 项目:geppetto 作者: datosio 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def decorator(minutes=1, error_message=os.strerror(errno.ETIME)):
        def dec(func):
            def _handle_timeout(signum, frame):
                msg = 'Timeout Error: %s' % (error_message)
                add_test_note(msg)
                raise TimeoutException(error_message)

            def wrapper(*args, **kwargs):
                if minutes > 0:
                    signal.signal(signal.SIGALRM, _handle_timeout)
                    signal.alarm(int(minutes * 60))
                try:
                    result = func(*args, **kwargs)
                finally:
                    signal.alarm(0)
                return result

            return wraps(func)(wrapper)

        return dec
base_test.py 文件源码 项目:sshaolin 作者: bucknerns 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator


# special test case for running ssh commands at module level
utils.py 文件源码 项目:steth 作者: openstack 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
test_dask.py 文件源码 项目:knit 作者: dask 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
copernicus.py 文件源码 项目:Copernicus 作者: Soroboruo 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
itest_utils.py 文件源码 项目:dcos-metronome-python 作者: tsukaby 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
timeout.py 文件源码 项目:TISP 作者: kaayy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    """returns a decorator controlling the running time of the decorated function.

    Throws a TimeoutError exception if the decorated function does not return in given time.
    """
    def decorator(func):
        """the decorator."""
        def _handle_timeout(_signum, _frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            """wraps the function"""
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
utils.py 文件源码 项目:leviathan 作者: leviathan-framework 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def timeout(seconds=30, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
__init__.py 文件源码 项目:kael 作者: 360skyeye 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
timeout_dec.py 文件源码 项目:AlexaBot 作者: jacobajit 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def timeout_dec(seconds=20, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
utils.py 文件源码 项目:nfvbench 作者: opnfv 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(_signum, _frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
sub.py 文件源码 项目:GLaDOS2 作者: TheComet 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.setitimer(signal.ITIMER_REAL,seconds) #used timer instead of alarm
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result
        return wraps(func)(wrapper)
    return decorator
test_packetizer.py 文件源码 项目:obsoleted-vpduserv 作者: InfraSIM 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_3_closed(self):
        rsock = LoopSocket()
        wsock = LoopSocket()
        rsock.link(wsock)
        p = Packetizer(wsock)
        p.set_log(util.get_logger('paramiko.transport'))
        p.set_hexdump(True)
        cipher = AES.new(zero_byte * 16, AES.MODE_CBC, x55 * 16)
        p.set_outbound_cipher(cipher, 16, sha1, 12, x1f * 20)

        # message has to be at least 16 bytes long, so we'll have at least one
        # block of data encrypted that contains zero random padding bytes
        m = Message()
        m.add_byte(byte_chr(100))
        m.add_int(100)
        m.add_int(1)
        m.add_int(900)
        wsock.send = lambda x: 0
        from functools import wraps
        import errno
        import os
        import signal

        class TimeoutError(Exception):
            pass

        def timeout(seconds=1, error_message=os.strerror(errno.ETIME)):
            def decorator(func):
                def _handle_timeout(signum, frame):
                    raise TimeoutError(error_message)

                def wrapper(*args, **kwargs):
                    signal.signal(signal.SIGALRM, _handle_timeout)
                    signal.alarm(seconds)
                    try:
                        result = func(*args, **kwargs)
                    finally:
                        signal.alarm(0)
                    return result

                return wraps(func)(wrapper)

            return decorator
        send = timeout()(p.send_message)
        self.assertRaises(EOFError, send, m)


问题


面经


文章

微信
公众号

扫码关注公众号