python类execute()的实例源码

amp.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __getattr__(self, name):
        """
        Pass attributes through to the real L{BrokerServer}, after checking
        that they're encodable with AMP.
        """
        original = getattr(self.broker_server, name, None)
        if (name in get_remote_methods(self.broker_server) and
            original is not None and
            callable(original)
            ):
            def method(*args, **kwargs):
                for arg in args:
                    assert MethodCallArgument.check(arg)
                for k, v in iteritems(kwargs):
                    assert MethodCallArgument.check(v)
                return execute(original, *args, **kwargs)
            return method
        else:
            raise AttributeError(name)
tapestry.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def loadModel(self, path, request):
        """Load a model, for the given path and request.

        @rtype: L{Deferred}
        """
        from twisted.internet.defer import execute
        return execute(self.loadModelNow, path, request)
tapestry.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def loadModel(self, path, request):
        """Load a model, for the given path and request.

        @rtype: L{Deferred}
        """
        from twisted.internet.defer import execute
        return execute(self.loadModelNow, path, request)
storage.py 文件源码 项目:reflector-cluster 作者: lbryio 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, redis_address):
        self.db = get_redis_connection(redis_address)
        if redis_address == 'fake':
            # fakeredis is not thread safe
            self.defer_func = defer.execute
        else:
            self.defer_func = threads.deferToThread
endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def listen(self, stdioProtocolFactory):
        """
        Implement L{IStreamServerEndpoint.listen} to listen on stdin/stdout
        """
        return defer.execute(self._stdio,
                             stdioProtocolFactory.buildProtocol(PipeAddress()),
                             reactor=self._reactor)
endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def listen(self, protocolFactory):
        """
        Implement L{IStreamServerEndpoint.listen} to listen on a TCP
        socket
        """
        return defer.execute(self._reactor.listenTCP,
                             self._port,
                             protocolFactory,
                             backlog=self._backlog,
                             interface=self._interface)
endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def listen(self, protocolFactory):
        """
        Implement L{IStreamServerEndpoint.listen} to listen for SSL on a
        TCP socket.
        """
        return defer.execute(self._reactor.listenSSL, self._port,
                             protocolFactory,
                             contextFactory=self._sslContextFactory,
                             backlog=self._backlog,
                             interface=self._interface)
endpoints.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def listen(self, protocolFactory):
        """
        Implement L{IStreamServerEndpoint.listen} to listen on a UNIX socket.
        """
        return defer.execute(self._reactor.listenUNIX, self._address,
                             protocolFactory,
                             backlog=self._backlog,
                             mode=self._mode,
                             wantPID=self._wantPID)


问题


面经


文章

微信
公众号

扫码关注公众号