def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
python类Agent()的实例源码
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def get_request(self, path, ccb=None, ecb=None):
self.completion_callback = self._completion_default if ccb is None \
else ccb
self.error_callback = self._error_default if ecb is None \
else ecb
self.request_path = path
d = self.agent.request(
'GET',
path,
Headers({'User-Agent': ['sawtooth stats collector']}),
None)
d.addCallback(self._handle_request)
d.addErrback(self._handle_error)
return d
def request_validate(self, url, user, realm, password):
"""
Issue an HTTP request to authenticate an user with a password in a given
realm using the specified privacyIDEA /validate/check endpoint.
:param url: an HTTP or HTTPS url to the /validate/check endpoint
:param user: username to authenticate
:param realm: realm of the user, empty string for default realm
:param password: password for authentication
:return: A Twisted Deferred which yields a `twisted.web.client.Response` instance or fails.
"""
body = urllib.urlencode({'user': user,
'realm': realm,
'pass': password})
# TODO: Is this really the preferred way to pass a string body?
producer = FileBodyProducer(StringIO(body))
d = self.factory.agent.request('POST',
url,
Headers({
'Content-Type': ['application/x-www-form-urlencoded'],
'User-Agent': ['privacyIDEA-LDAP-Proxy']
}),
producer)
return d
def __init__(self, domain, username, pw, server, use_ssl, policy_key=0, server_version="14.0", device_type="iPhone", device_id=None, verbose=False):
self.use_ssl = use_ssl
self.domain = domain
self.username = username
self.password = pw
self.server = server
self.device_id = device_id
if not self.device_id:
self.device_id = str(uuid.uuid4()).replace("-","")[:32]
self.server_version = server_version
self.device_type = device_type
self.policy_key = policy_key
self.folder_data = {}
self.verbose = verbose
self.collection_data = {}
clientContext = WebClientContextFactory()
self.agent = Agent(reactor, clientContext)
self.operation_queue = defer.DeferredQueue()
self.queue_deferred = self.operation_queue.get()
self.queue_deferred.addCallback(self.queue_full)
# Response processing
def provision(self):
prov_url = self.add_parameters(self.get_url(), {"Cmd":"Provision", "User":self.username, "DeviceId":self.device_id, "DeviceType":self.device_type})
d = self.agent.request(
'POST',
prov_url,
Headers({'User-Agent': ['python-EAS-Client %s'%version],
'Authorization': [self.authorization_header()],
'MS-ASProtocolVersion': [self.server_version],
'X-MS-PolicyKey': [str(self.policy_key)],
'Content-Type': ["application/vnd.ms-sync.wbxml"]}),
ProvisionProducer(verbose=self.verbose))
d.addCallback(self.wbxml_response)
d.addCallback(self.process_policy_key)
d.addCallback(self.acknowledge)
d.addErrback(self.activesync_error)
return d
def folder_sync(self, sync_key=0):
if sync_key == 0 and "key" in self.folder_data:
sync_key = self.folder_data["key"]
sync_url = self.add_parameters(self.get_url(), {"Cmd":"FolderSync", "User":self.username, "DeviceId":self.device_id, "DeviceType":self.device_type})
d = self.agent.request(
'POST',
sync_url,
Headers({'User-Agent': ['python-EAS-Client %s'%version],
'Authorization': [self.authorization_header()],
'MS-ASProtocolVersion': [self.server_version],
'X-MS-PolicyKey': [str(self.policy_key)],
'Content-Type': ["application/vnd.ms-sync.wbxml"]}),
FolderSyncProducer(sync_key, verbose=self.verbose))
d.addCallback(self.wbxml_response)
d.addCallback(self.process_folder_sync)
d.addErrback(self.activesync_error)
return d
def sync(self, collectionId, sync_key=0, get_body=False):
if sync_key == 0 and collectionId in self.collection_data:
sync_key = self.collection_data[collectionId]["key"]
sync_url = self.add_parameters(self.get_url(), {"Cmd":"Sync", "User":self.username, "DeviceId":self.device_id, "DeviceType":self.device_type})
d = self.agent.request(
'POST',
sync_url,
Headers({'User-Agent': ['python-EAS-Client %s'%version],
'Authorization': [self.authorization_header()],
'MS-ASProtocolVersion': [self.server_version],
'X-MS-PolicyKey': [str(self.policy_key)],
'Content-Type': ["application/vnd.ms-sync.wbxml"]}),
SyncProducer(collectionId, sync_key, get_body, verbose=self.verbose))
d.addCallback(self.wbxml_response)
d.addCallback(self.process_sync, collectionId)
d.addErrback(self.activesync_error)
return d
def __init__(self):
parlay.ParlayCommandItem.__init__(self, "parlay.items.cloud_link", "Cloud Link")
self._http_agent = Agent(self._reactor)
self.channel_uri = None
self.cloud_factory = None
if CloudLinkSettings.PRIVATE_KEY_LOCATION is None:
raise RuntimeError("CloudLinkSettings.PRIVATE_KEY_LOCATION must be set for cloud to work")
if CloudLinkSettings.UUID_LOCATION is None:
raise RuntimeError("CloudLinkSettings.UUID_LOCATION must be set for cloud to work")
try:
with open(CloudLinkSettings.UUID_LOCATION, 'r') as uuid_file:
self.uuid = uuid_file.read()
except IOError:
logger.warn("Error reading UUID file. Has this device been registered?")
self.uuid = ""
def http_request(self, path="/", encode='zlib'):
"""
Do an http request on the broker
:type path str
"""
url = "http://localhost:" + str(Broker.get_instance().http_port)+path
print url
# http"://localhost:broker.http_port
request = self._http_agent.request(
'GET',
url,
Headers({'User-Agent': ['Twisted Web Client']}),
None)
request.addCallback(lambda response: readBody(response))
request.addCallback(lambda html: (base64.b64encode(html.encode(encode))) if encode == "zlib" else html)
return request
twisted_test.py 文件源码
项目:My-Web-Server-Framework-With-Python2.7
作者: syjsu
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def testValidOptionsRequest(self):
"""
Makes sure that a "regular" OPTIONS request doesn't include the CORS
specific headers in the response.
"""
agent = Agent(reactor)
headers = Headers({'origin': ['http://localhost']})
response = yield agent.request('OPTIONS', self.uri, headers)
# Check we get the correct status.
self.assertEqual(http.OK, response.code)
# Check we get the correct length
self.assertEqual(0, response.length)
# Check we get the right headers back
self.assertTrue(response.headers.hasHeader('Allow'))
self.assertFalse(
response.headers.hasHeader('Access-Control-Allow-Origin'))
self.assertFalse(response.headers.hasHeader('Access-Control-Max-Age'))
self.assertFalse(
response.headers.hasHeader('Access-Control-Allow-Credentials'))
self.assertFalse(
response.headers.hasHeader('Access-Control-Allow-Methods'))
def testViaAgent(self):
"""
This is a manual check of a POST to /objects which uses
L{twisted.web.client.Agent} to make the request. We do not use
txFluidDB because we need to check that a Location header is
received and that we receive both a 'URI' and an 'id' in the JSON
response payload.
"""
URI = self.txEndpoint.getRootURL() + defaults.httpObjectCategoryName
basicAuth = 'Basic %s' % b64encode('%s:%s' % ('testuser1', 'secret'))
headers = Headers({'accept': ['application/json'],
'authorization': [basicAuth]})
agent = Agent(reactor)
response = yield agent.request('POST', URI, headers)
self.assertEqual(http.CREATED, response.code)
self.assertTrue(response.headers.hasHeader('location'))
d = defer.Deferred()
bodyGetter = ResponseGetter(d)
response.deliverBody(bodyGetter)
body = yield d
responseDict = json.loads(body)
self.assertIn('URI', responseDict)
self.assertIn('id', responseDict)
def testQueryUnicodePath(self):
"""A query on a non-existent Unicode tag, should 404. Part of the
point here is to make sure that no other error occurs due to
passing in a Unicode tag path.
"""
path = u'çóñ/???'
query = '%s = "hi"' % path
URI = '%s/%s?query=%s' % (
self.endpoint,
defaults.httpObjectCategoryName,
urllib.quote(query.encode('utf-8')))
headers = Headers({'accept': ['application/json']})
agent = Agent(reactor)
response = yield agent.request('GET', URI, headers)
self.assertEqual(http.NOT_FOUND, response.code)
def testValidCORSRequest(self):
"""
Sanity check to make sure we get the valid headers back for a CORS
based request.
"""
agent = Agent(reactor)
headers = Headers()
# The origin to use in the tests
dummy_origin = 'http://foo.com'
headers.addRawHeader('Origin', dummy_origin)
response = yield agent.request('GET', self.uri, headers)
# Check we get the correct status.
self.assertEqual(http.OK, response.code)
# Check we get the right headers back
self.assertTrue(
response.headers.hasHeader('Access-Control-Allow-Origin'))
self.assertTrue(
response.headers.hasHeader('Access-Control-Allow-Credentials'))
self.assertTrue(
dummy_origin in
response.headers.getRawHeaders('Access-Control-Allow-Origin'))
def testVersionGets404(self):
"""
Version numbers used to be able to be given in API calls, but are
no longer supported.
"""
version = 20100808
URI = '%s/%d/%s/%s' % (
self.endpoint,
version,
defaults.httpNamespaceCategoryName,
defaults.adminUsername)
headers = Headers({'accept': ['application/json']})
agent = Agent(reactor)
response = yield agent.request('GET', URI, headers)
self.assertEqual(http.NOT_FOUND, response.code)
# TODO: Add a test for a namespace that we don't have LIST perm on.
# although that might be done in permissions.py when that finally gets
# added.
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def twisted_coroutine_fetch(self, url, runner):
body = [None]
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
# by reading the body in one blob instead of streaming it with
# a Protocol.
client = Agent(self.reactor)
response = yield client.request(b'GET', utf8(url))
with warnings.catch_warnings():
# readBody has a buggy DeprecationWarning in Twisted 15.0:
# https://twistedmatrix.com/trac/changeset/43379
warnings.simplefilter('ignore', category=DeprecationWarning)
body[0] = yield readBody(response)
self.stop_loop()
self.io_loop.add_callback(f)
runner()
return body[0]
def test_nonPersistent(self):
"""
If C{persistent} is set to C{False} when creating the
L{HTTPConnectionPool}, C{Request}s are created with their
C{persistent} flag set to C{False}.
Elsewhere in the tests for the underlying HTTP code we ensure that
this will result in the disconnection of the HTTP protocol once the
request is done, so that the connection will not be returned to the
pool.
"""
pool = HTTPConnectionPool(self.reactor, persistent=False)
agent = client.Agent(self.reactor, pool=pool)
agent._getEndpoint = lambda *args: self
agent.request(b"GET", b"http://127.0.0.1")
self.assertEqual(self.protocol.requests[0][0].persistent, False)
def test_headersUnmodified(self):
"""
If a I{Host} header must be added to the request, the L{Headers}
instance passed to L{Agent.request} is not modified.
"""
headers = http_headers.Headers()
self.agent._getEndpoint = lambda *args: self
self.agent.request(
b'GET', b'http://example.com/foo', headers)
protocol = self.protocol
# The request should have been issued.
self.assertEqual(len(protocol.requests), 1)
# And the headers object passed in should not have changed.
self.assertEqual(headers, http_headers.Headers())
def makeEndpoint(self, host=b'example.com', port=443):
"""
Create an L{Agent} with an https scheme and return its endpoint
created according to the arguments.
@param host: The host for the endpoint.
@type host: L{bytes}
@param port: The port for the endpoint.
@type port: L{int}
@return: An endpoint of an L{Agent} constructed according to args.
@rtype: L{SSL4ClientEndpoint}
"""
return client.Agent(self.createReactor())._getEndpoint(
URI.fromBytes(b'https://' + host + b":" + intToBytes(port) + b"/"))
def test_deprecatedDuckPolicy(self):
"""
Passing something that duck-types I{like} a L{web client context
factory <twisted.web.client.WebClientContextFactory>} - something that
does not provide L{IPolicyForHTTPS} - to L{Agent} emits a
L{DeprecationWarning} even if you don't actually C{import
WebClientContextFactory} to do it.
"""
def warnMe():
client.Agent(MemoryReactorClock(),
"does-not-provide-IPolicyForHTTPS")
warnMe()
warnings = self.flushWarnings([warnMe])
self.assertEqual(len(warnings), 1)
[warning] = warnings
self.assertEqual(warning['category'], DeprecationWarning)
self.assertEqual(
warning['message'],
"'does-not-provide-IPolicyForHTTPS' was passed as the HTTPS "
"policy for an Agent, but it does not provide IPolicyForHTTPS. "
"Since Twisted 14.0, you must pass a provider of IPolicyForHTTPS."
)
def integrationTest(self, hostName, expectedAddress, addressType):
"""
Wrap L{AgentTestsMixin.integrationTest} with TLS.
"""
authority, server = certificatesForAuthorityAndServer(hostName
.decode('ascii'))
def tlsify(serverFactory):
return TLSMemoryBIOFactory(server.options(), False, serverFactory)
def tlsagent(reactor):
from twisted.web.iweb import IPolicyForHTTPS
from zope.interface import implementer
@implementer(IPolicyForHTTPS)
class Policy(object):
def creatorForNetloc(self, hostname, port):
return optionsForClientTLS(hostname.decode("ascii"),
trustRoot=authority)
return client.Agent(reactor, contextFactory=Policy())
(super(AgentHTTPSTests, self)
.integrationTest(hostName, expectedAddress, addressType,
serverWrapper=tlsify,
createAgent=tlsagent,
scheme=b'https'))
def test_noRedirect(self):
"""
L{client.RedirectAgent} behaves like L{client.Agent} if the response
doesn't contain a redirect.
"""
deferred = self.agent.request(b'GET', b'http://example.com/foo')
req, res = self.protocol.requests.pop()
headers = http_headers.Headers()
response = Response((b'HTTP', 1, 1), 200, b'OK', headers, None)
res.callback(response)
self.assertEqual(0, len(self.protocol.requests))
result = self.successResultOf(deferred)
self.assertIdentical(response, result)
self.assertIdentical(result.previousResponse, None)
def test_protectedServerAndDate(self):
"""
If the CGI script emits a I{Server} or I{Date} header, these are
ignored.
"""
cgiFilename = self.writeCGI(SPECIAL_HEADER_CGI)
portnum = self.startServer(cgiFilename)
url = "http://localhost:%d/cgi" % (portnum,)
agent = client.Agent(reactor)
d = agent.request(b"GET", url)
d.addCallback(discardBody)
def checkResponse(response):
self.assertNotIn('monkeys',
response.headers.getRawHeaders('server'))
self.assertNotIn('last year',
response.headers.getRawHeaders('date'))
d.addCallback(checkResponse)
return d
def test_noDuplicateContentTypeHeaders(self):
"""
If the CGI script emits a I{content-type} header, make sure that the
server doesn't add an additional (duplicate) one, as per ticket 4786.
"""
cgiFilename = self.writeCGI(NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI)
portnum = self.startServer(cgiFilename)
url = "http://localhost:%d/cgi" % (portnum,)
agent = client.Agent(reactor)
d = agent.request(b"GET", url)
d.addCallback(discardBody)
def checkResponse(response):
self.assertEqual(
response.headers.getRawHeaders('content-type'),
['text/cgi-duplicate-test'])
return response
d.addCallback(checkResponse)
return d
def test_duplicateHeaderCGI(self):
"""
If a CGI script emits two instances of the same header, both are sent in
the response.
"""
cgiFilename = self.writeCGI(DUAL_HEADER_CGI)
portnum = self.startServer(cgiFilename)
url = "http://localhost:%d/cgi" % (portnum,)
agent = client.Agent(reactor)
d = agent.request(b"GET", url)
d.addCallback(discardBody)
def checkResponse(response):
self.assertEqual(
response.headers.getRawHeaders('header'), ['spam', 'eggs'])
d.addCallback(checkResponse)
return d
def test_malformedHeaderCGI(self):
"""
Check for the error message in the duplicated header
"""
cgiFilename = self.writeCGI(BROKEN_HEADER_CGI)
portnum = self.startServer(cgiFilename)
url = "http://localhost:%d/cgi" % (portnum,)
agent = client.Agent(reactor)
d = agent.request(b"GET", url)
d.addCallback(discardBody)
loggedMessages = []
def addMessage(eventDict):
loggedMessages.append(log.textFromEventDict(eventDict))
log.addObserver(addMessage)
self.addCleanup(log.removeObserver, addMessage)
def checkResponse(ignored):
self.assertIn("ignoring malformed CGI header: 'XYZ'",
loggedMessages)
d.addCallback(checkResponse)
return d
def testReadInput(self):
cgiFilename = os.path.abspath(self.mktemp())
with open(cgiFilename, 'wt') as cgiFile:
cgiFile.write(READINPUT_CGI)
portnum = self.startServer(cgiFilename)
agent = client.Agent(reactor)
d = agent.request(
uri="http://localhost:%d/cgi" % (portnum,),
method=b"POST",
bodyProducer=client.FileBodyProducer(
BytesIO(b"Here is your stdin")),
)
d.addCallback(client.readBody)
d.addCallback(self._testReadInput_1)
return d