python类Agent()的实例源码

test_distrib.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        agent = client.Agent(reactor)
        d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \
                          (self.port2.getHost().port,))
        d.addCallback(client.readBody)
        d.addCallback(self.assertEqual, 'root')
        return d
test_distrib.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _requestTest(self, child, **kwargs):
        """
        Set up a resource on a distrib site using L{ResourcePublisher} and
        then retrieve it from a L{ResourceSubscription} via an HTTP client.

        @param child: The resource to publish using distrib.
        @param **kwargs: Extra keyword arguments to pass to L{Agent.request} when
            requesting the resource.

        @return: A L{Deferred} which fires with the result of the request.
        """
        mainPort, mainAddr = self._setupDistribServer(child)
        agent = client.Agent(reactor)
        url = "http://%s:%s/child" % (mainAddr.host, mainAddr.port)
        d = agent.request(b"GET", url, **kwargs)
        d.addCallback(client.readBody)
        return d
riak.py 文件源码 项目:duct 作者: ducted 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _get_stats_from_node(self):
        agent = Agent(reactor)

        url = self.config.get('url', 'http://%s:8098/stats' % self.hostname)
        ua = self.config.get('useragent', 'Duct Riak stats checker')

        headers = Headers({'User-Agent': [ua]})
        request = yield agent.request('GET'.encode(), url.encode(), headers)

        if (request.length) and (request.code == 200):
            d = defer.Deferred()
            request.deliverBody(BodyReceiver(d))
            b = yield d
            body = b.read()
        else:
            body = "{}"

        defer.returnValue(json.loads(body))
_network.py 文件源码 项目:txkube 作者: LeastAuthority 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def network_kubernetes(**kw):
    """
    Create a new ``IKubernetes`` provider which can be used to create clients.

    :param twisted.python.url.URL base_url: The root of the Kubernetes HTTPS
        API to interact with.

    :param twisted.web.iweb.IAgent agent: An HTTP agent to use to issue
        requests.  Defaults to a new ``twisted.web.client.Agent`` instance.
        See ``txkube.authenticate_with_serviceaccount`` and
        ``txkube.authenticate_with_certificate`` for helpers for creating
        agents that interact well with Kubernetes servers.

    :return IKubernetes: The Kubernetes service.
    """
    return _NetworkKubernetes(**kw)
_authentication.py 文件源码 项目:txkube 作者: LeastAuthority 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def authenticate_with_serviceaccount(reactor, **kw):
    """
    Create an ``IAgent`` which can issue authenticated requests to a
    particular Kubernetes server using a service account token.

    :param reactor: The reactor with which to configure the resulting agent.

    :param bytes path: The location of the service account directory.  The
        default should work fine for normal use within a container.

    :return IAgent: An agent which will authenticate itself to a particular
        Kubernetes server and which will verify that server or refuse to
        interact with it.
    """
    config = KubeConfig.from_service_account(**kw)
    policy = https_policy_from_config(config)
    token = config.user["token"]
    agent = HeaderInjectingAgent(
        _to_inject=Headers({u"authorization": [u"Bearer {}".format(token)]}),
        _agent=Agent(reactor, contextFactory=policy),
    )
    return agent
twisted_test.py 文件源码 项目:PyQYT 作者: collinsctk 项目源码 文件源码 阅读 43 收藏 0 点赞 0 评论 0
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
YoutubeAuth.py 文件源码 项目:enigma2-plugins 作者: opendreambox 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _pollForResult(self):
        if self._canceled:
            Log.d("Auth Request canceled")
            return
        if self._user_code.expired():
            self._onError(self.ERROR_CREDENTIALS_REQUEST_EXPIRED)
            return
        d = self._agent.request(
            'POST',
            self.AUTH_RESPONSE_URI,
            Headers({
                'User-Agent' : [self.USER_AGENT],
                'Content-Type' : ["application/x-www-form-urlencoded"],
            }),
            StringProducer("client_id=%s&client_secret=%s&code=%s&grant_type=%s" % (self.CLIENT_ID, self.CLIENT_SECRET, str(self._user_code.device_code), self.GRANT_TYPE_DEVICE_AUTH))
        )
        d.addCallbacks(self._onCredentialsPollResponse, self._onCredentialsPollError)
        self._responseDeferred = d
        return d;
acme_util.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def create_txacme_client_creator(reactor, url, key, alg=jose.RS256):
    """
    Create a creator for txacme clients to provide to the txacme service. See
    ``txacme.client.Client.from_url()``. We create the underlying JWSClient
    with a non-persistent pool to avoid
    https://github.com/mithrandi/txacme/issues/86.

    :return: a callable that returns a deffered that returns the client
    """
    # Creating an Agent without specifying a pool gives us the default pool
    # which is non-persistent.
    jws_client = JWSClient(HTTPClient(agent=Agent(reactor)), key, alg)

    return partial(txacme_Client.from_url, reactor, url, key, alg, jws_client)
test_clients.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_default_client(self):
        """
        When default_client is passed a client it should return that client.
        """
        client = treq_HTTPClient(Agent(reactor))

        assert_that(default_client(client, reactor), Is(client))
clients.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def default_client(client, reactor):
    """
    Set up a default client if one is not provided. Set up the default
    ``twisted.web.client.Agent`` using the provided reactor.
    """
    if client is None:
        from twisted.web.client import Agent
        client = treq_HTTPClient(Agent(reactor))

    return client
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
twisted_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def twisted_fetch(self, url, runner):
        # http://twistedmatrix.com/documents/current/web/howto/client.html
        chunks = []
        client = Agent(self.reactor)
        d = client.request(b'GET', utf8(url))

        class Accumulator(Protocol):
            def __init__(self, finished):
                self.finished = finished

            def dataReceived(self, data):
                chunks.append(data)

            def connectionLost(self, reason):
                self.finished.callback(None)

        def callback(response):
            finished = Deferred()
            response.deliverBody(Accumulator(finished))
            return finished
        d.addCallback(callback)

        def shutdown(failure):
            if hasattr(self, 'stop_loop'):
                self.stop_loop()
            elif failure is not None:
                # loop hasn't been initialized yet; try our best to
                # get an error message out. (the runner() interaction
                # should probably be refactored).
                try:
                    failure.raiseException()
                except:
                    logging.error('exception before starting loop', exc_info=True)
        d.addBoth(shutdown)
        runner()
        self.assertTrue(chunks)
        return ''.join(chunks)
_client_tx.py 文件源码 项目:txaio-etcd 作者: crossbario 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __init__(self, reactor, url, pool=None, timeout=None, connect_timeout=None):
        """

        :param rector: Twisted reactor to use.
        :type reactor: class

        :param url: etcd URL, eg `http://localhost:2379`
        :type url: str

        :param pool: Twisted Web agent connection pool
        :type pool:

        :param timeout: If given, a global request timeout used for all
            requests to etcd.
        :type timeout: float or None

        :param connect_timeout: If given, a global connection timeout used when
            opening a new HTTP connection to etcd.
        :type connect_timeout: float or None
        """
        if type(url) != six.text_type:
            raise TypeError('url must be of type unicode, was {}'.format(type(url)))
        self._url = url
        self._timeout = timeout
        self._pool = pool or HTTPConnectionPool(reactor, persistent=True)
        self._pool._factory.noisy = False
        self._agent = Agent(reactor, connectTimeout=connect_timeout, pool=self._pool)
_http.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def cookieAgentFactory(verify_path, connectTimeout=30):
    customPolicy = BrowserLikePolicyForHTTPS(
        Certificate.loadPEM(FilePath(verify_path).getContent()))
    agent = Agent(reactor, customPolicy, connectTimeout=connectTimeout)
    cookiejar = cookielib.CookieJar()
    return CookieAgent(agent, cookiejar)
stats_client.py 文件源码 项目:sawtooth-validator 作者: hyperledger-archives 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __init__(self):
        self.request_count = 0
        self.error_count = 0
        self.agent = Agent(reactor)
        self.completion_callback = None
        self.error_callback = None
        self.request_path = None
autodiscovery.py 文件源码 项目:peas 作者: mwrlabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, email):
        self.email = email
        self.email_domain = email.split("@")[1]
        self.agent = Agent(reactor)
        self.state = AutoDiscover.STATE_INIT
        self.redirect_urls = []
autodiscovery.py 文件源码 项目:peas 作者: mwrlabs 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def handle_redirect(self, new_url):
        if new_url in self.redirect_urls:
            raise Exception("AutoDiscover", "Circular redirection")
        self.redirect_urls.append(new_url)
        self.state = AutoDiscover.STATE_REDIRECT
        print "Making request to",new_url
        d = self.agent.request(
            'GET',
            new_url,
            Headers({'User-Agent': ['python-EAS-Client %s'%version]}),
            AutoDiscoveryProducer(self.email))
        d.addCallback(self.autodiscover_response)
        d.addErrback(self.autodiscover_error)
        return d
autodiscovery.py 文件源码 项目:peas 作者: mwrlabs 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def autodiscover(self):
        self.state += 1
        if self.state in AutoDiscover.AD_REQUESTS:
            print "Making request to",AutoDiscover.AD_REQUESTS[self.state]%self.email_domain
            body = AutoDiscoveryProducer(self.email)
            d = self.agent.request(
                'GET',
                AutoDiscover.AD_REQUESTS[self.state]%self.email_domain,
                Headers({'User-Agent': ['python-EAS-Client %s'%version]}),
                body)
            d.addCallback(self.autodiscover_response)
            d.addErrback(self.autodiscover_error)
            return d
        else:
            raise Exception("Unsupported state",str(self.state))
activesync.py 文件源码 项目:peas 作者: mwrlabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def get_options(self):
        if self.verbose: print "Options, get URL:",self.get_url(),"Authorization",self.authorization_header()
        d = self.agent.request(
            'OPTIONS',
            self.get_url(),
            Headers({'User-Agent': ['python-EAS-Client %s'%version], 'Authorization': [self.authorization_header()]}),
            None)
        d.addCallback(self.options_response)
        d.addErrback(self.activesync_error)
        return d


问题


面经


文章

微信
公众号

扫码关注公众号