def testDistrib(self):
# site1 is the publisher
r1 = resource.Resource()
r1.putChild("there", static.Data("root", "text/plain"))
site1 = server.Site(r1)
self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
self.port1 = reactor.listenTCP(0, self.f1)
self.sub = distrib.ResourceSubscription("127.0.0.1",
self.port1.getHost().port)
r2 = resource.Resource()
r2.putChild("here", self.sub)
f2 = MySite(r2)
self.port2 = reactor.listenTCP(0, f2)
agent = client.Agent(reactor)
d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \
(self.port2.getHost().port,))
d.addCallback(client.readBody)
d.addCallback(self.assertEqual, 'root')
return d
python类Agent()的实例源码
def _requestTest(self, child, **kwargs):
"""
Set up a resource on a distrib site using L{ResourcePublisher} and
then retrieve it from a L{ResourceSubscription} via an HTTP client.
@param child: The resource to publish using distrib.
@param **kwargs: Extra keyword arguments to pass to L{Agent.request} when
requesting the resource.
@return: A L{Deferred} which fires with the result of the request.
"""
mainPort, mainAddr = self._setupDistribServer(child)
agent = client.Agent(reactor)
url = "http://%s:%s/child" % (mainAddr.host, mainAddr.port)
d = agent.request(b"GET", url, **kwargs)
d.addCallback(client.readBody)
return d
def _get_stats_from_node(self):
agent = Agent(reactor)
url = self.config.get('url', 'http://%s:8098/stats' % self.hostname)
ua = self.config.get('useragent', 'Duct Riak stats checker')
headers = Headers({'User-Agent': [ua]})
request = yield agent.request('GET'.encode(), url.encode(), headers)
if (request.length) and (request.code == 200):
d = defer.Deferred()
request.deliverBody(BodyReceiver(d))
b = yield d
body = b.read()
else:
body = "{}"
defer.returnValue(json.loads(body))
def network_kubernetes(**kw):
"""
Create a new ``IKubernetes`` provider which can be used to create clients.
:param twisted.python.url.URL base_url: The root of the Kubernetes HTTPS
API to interact with.
:param twisted.web.iweb.IAgent agent: An HTTP agent to use to issue
requests. Defaults to a new ``twisted.web.client.Agent`` instance.
See ``txkube.authenticate_with_serviceaccount`` and
``txkube.authenticate_with_certificate`` for helpers for creating
agents that interact well with Kubernetes servers.
:return IKubernetes: The Kubernetes service.
"""
return _NetworkKubernetes(**kw)
def authenticate_with_serviceaccount(reactor, **kw):
"""
Create an ``IAgent`` which can issue authenticated requests to a
particular Kubernetes server using a service account token.
:param reactor: The reactor with which to configure the resulting agent.
:param bytes path: The location of the service account directory. The
default should work fine for normal use within a container.
:return IAgent: An agent which will authenticate itself to a particular
Kubernetes server and which will verify that server or refuse to
interact with it.
"""
config = KubeConfig.from_service_account(**kw)
policy = https_policy_from_config(config)
token = config.user["token"]
agent = HeaderInjectingAgent(
_to_inject=Headers({u"authorization": [u"Bearer {}".format(token)]}),
_agent=Agent(reactor, contextFactory=policy),
)
return agent
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def _pollForResult(self):
if self._canceled:
Log.d("Auth Request canceled")
return
if self._user_code.expired():
self._onError(self.ERROR_CREDENTIALS_REQUEST_EXPIRED)
return
d = self._agent.request(
'POST',
self.AUTH_RESPONSE_URI,
Headers({
'User-Agent' : [self.USER_AGENT],
'Content-Type' : ["application/x-www-form-urlencoded"],
}),
StringProducer("client_id=%s&client_secret=%s&code=%s&grant_type=%s" % (self.CLIENT_ID, self.CLIENT_SECRET, str(self._user_code.device_code), self.GRANT_TYPE_DEVICE_AUTH))
)
d.addCallbacks(self._onCredentialsPollResponse, self._onCredentialsPollError)
self._responseDeferred = d
return d;
def create_txacme_client_creator(reactor, url, key, alg=jose.RS256):
"""
Create a creator for txacme clients to provide to the txacme service. See
``txacme.client.Client.from_url()``. We create the underlying JWSClient
with a non-persistent pool to avoid
https://github.com/mithrandi/txacme/issues/86.
:return: a callable that returns a deffered that returns the client
"""
# Creating an Agent without specifying a pool gives us the default pool
# which is non-persistent.
jws_client = JWSClient(HTTPClient(agent=Agent(reactor)), key, alg)
return partial(txacme_Client.from_url, reactor, url, key, alg, jws_client)
def test_default_client(self):
"""
When default_client is passed a client it should return that client.
"""
client = treq_HTTPClient(Agent(reactor))
assert_that(default_client(client, reactor), Is(client))
def default_client(client, reactor):
"""
Set up a default client if one is not provided. Set up the default
``twisted.web.client.Agent`` using the provided reactor.
"""
if client is None:
from twisted.web.client import Agent
client = treq_HTTPClient(Agent(reactor))
return client
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def twisted_fetch(self, url, runner):
# http://twistedmatrix.com/documents/current/web/howto/client.html
chunks = []
client = Agent(self.reactor)
d = client.request(b'GET', utf8(url))
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
def dataReceived(self, data):
chunks.append(data)
def connectionLost(self, reason):
self.finished.callback(None)
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
def shutdown(failure):
if hasattr(self, 'stop_loop'):
self.stop_loop()
elif failure is not None:
# loop hasn't been initialized yet; try our best to
# get an error message out. (the runner() interaction
# should probably be refactored).
try:
failure.raiseException()
except:
logging.error('exception before starting loop', exc_info=True)
d.addBoth(shutdown)
runner()
self.assertTrue(chunks)
return ''.join(chunks)
def __init__(self, reactor, url, pool=None, timeout=None, connect_timeout=None):
"""
:param rector: Twisted reactor to use.
:type reactor: class
:param url: etcd URL, eg `http://localhost:2379`
:type url: str
:param pool: Twisted Web agent connection pool
:type pool:
:param timeout: If given, a global request timeout used for all
requests to etcd.
:type timeout: float or None
:param connect_timeout: If given, a global connection timeout used when
opening a new HTTP connection to etcd.
:type connect_timeout: float or None
"""
if type(url) != six.text_type:
raise TypeError('url must be of type unicode, was {}'.format(type(url)))
self._url = url
self._timeout = timeout
self._pool = pool or HTTPConnectionPool(reactor, persistent=True)
self._pool._factory.noisy = False
self._agent = Agent(reactor, connectTimeout=connect_timeout, pool=self._pool)
def cookieAgentFactory(verify_path, connectTimeout=30):
customPolicy = BrowserLikePolicyForHTTPS(
Certificate.loadPEM(FilePath(verify_path).getContent()))
agent = Agent(reactor, customPolicy, connectTimeout=connectTimeout)
cookiejar = cookielib.CookieJar()
return CookieAgent(agent, cookiejar)
def __init__(self):
self.request_count = 0
self.error_count = 0
self.agent = Agent(reactor)
self.completion_callback = None
self.error_callback = None
self.request_path = None
def __init__(self, email):
self.email = email
self.email_domain = email.split("@")[1]
self.agent = Agent(reactor)
self.state = AutoDiscover.STATE_INIT
self.redirect_urls = []
def handle_redirect(self, new_url):
if new_url in self.redirect_urls:
raise Exception("AutoDiscover", "Circular redirection")
self.redirect_urls.append(new_url)
self.state = AutoDiscover.STATE_REDIRECT
print "Making request to",new_url
d = self.agent.request(
'GET',
new_url,
Headers({'User-Agent': ['python-EAS-Client %s'%version]}),
AutoDiscoveryProducer(self.email))
d.addCallback(self.autodiscover_response)
d.addErrback(self.autodiscover_error)
return d
def autodiscover(self):
self.state += 1
if self.state in AutoDiscover.AD_REQUESTS:
print "Making request to",AutoDiscover.AD_REQUESTS[self.state]%self.email_domain
body = AutoDiscoveryProducer(self.email)
d = self.agent.request(
'GET',
AutoDiscover.AD_REQUESTS[self.state]%self.email_domain,
Headers({'User-Agent': ['python-EAS-Client %s'%version]}),
body)
d.addCallback(self.autodiscover_response)
d.addErrback(self.autodiscover_error)
return d
else:
raise Exception("Unsupported state",str(self.state))
def get_options(self):
if self.verbose: print "Options, get URL:",self.get_url(),"Authorization",self.authorization_header()
d = self.agent.request(
'OPTIONS',
self.get_url(),
Headers({'User-Agent': ['python-EAS-Client %s'%version], 'Authorization': [self.authorization_header()]}),
None)
d.addCallback(self.options_response)
d.addErrback(self.activesync_error)
return d